using System.Buffers;
using System.IO.Pipelines;
using System.Threading;
namespace AyCode.Services.Server.Tests.SignalRs;
///
/// Pipe-based test transport for AsyncSegment protocol path.
///
/// Unlike (IBufferWriter only), this exposes a real
/// , so AcBinaryHubProtocol.WriteMessage can enter
/// AsyncSegment chunked mode (output is PipeWriter check).
///
/// The internal uses a slab-like memory pool with fixed segment size,
/// random offsets and size jitter to better simulate transport behavior.
///
internal sealed class AsyncSegmentPipeTransportWriter : IDisposable
{
private readonly Pipe _pipe;
private bool _disposed;
private bool _writerCompleted;
public AsyncSegmentPipeTransportWriter(int segmentSize = 256, int seed = 42)
{
if (segmentSize <= 0)
throw new ArgumentOutOfRangeException(nameof(segmentSize));
_pipe = new Pipe(new PipeOptions(
pool: new SlabSimulatingPool(segmentSize, seed),
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 0,
resumeWriterThreshold: 0,
minimumSegmentSize: segmentSize,
useSynchronizationContext: false));
}
///
/// Gets the PipeWriter that must be passed to protocol WriteMessage
/// to activate AsyncSegment chunked write path.
///
public PipeWriter Writer => _pipe.Writer;
///
/// Gets the paired PipeReader for test-side inspection and parsing.
///
public PipeReader Reader => _pipe.Reader;
///
/// Completes only the writer side of the internal pipe.
/// Reader remains open so tests can continue draining buffered data.
///
public void CompleteWriter()
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_writerCompleted)
return;
_writerCompleted = true;
_pipe.Writer.Complete();
}
///
/// Drains all currently available bytes from the reader into a contiguous array.
/// Does not require completing the writer.
///
public byte[] DrainAvailableBytes()
{
ObjectDisposedException.ThrowIf(_disposed, this);
var output = new ArrayBufferWriter();
while (_pipe.Reader.TryRead(out var result))
{
var buffer = result.Buffer;
foreach (var segment in buffer)
output.Write(segment.Span);
_pipe.Reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
return output.WrittenSpan.ToArray();
}
///
/// Asynchronously drains all data until the writer is completed and the pipe is exhausted.
///
public async Task DrainAllAsync(CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_disposed, this);
var output = new ArrayBufferWriter();
while (true)
{
var result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var buffer = result.Buffer;
foreach (var segment in buffer)
output.Write(segment.Span);
_pipe.Reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
return output.WrittenSpan.ToArray();
}
///
/// Completes writer and reader sides of the internal pipe.
///
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
if (!_writerCompleted)
{
_writerCompleted = true;
_pipe.Writer.Complete();
}
_pipe.Reader.Complete();
}
private sealed class SlabSimulatingPool : MemoryPool
{
private readonly int _segmentSize;
private readonly Random _rng;
public SlabSimulatingPool(int segmentSize, int seed)
{
_segmentSize = segmentSize;
_rng = new Random(seed);
}
public override int MaxBufferSize => _segmentSize;
public override IMemoryOwner Rent(int minBufferSize = -1)
{
var requested = minBufferSize > 0 ? minBufferSize : _segmentSize;
var size = Math.Max(requested, _segmentSize);
var offset = _rng.Next(0, Math.Max(1, _segmentSize));
var jitter = _rng.Next(-_segmentSize / 4, _segmentSize / 4 + 1);
var actualSize = Math.Max(16, size + jitter);
var array = new byte[actualSize + offset];
return new Owner(array, offset, actualSize);
}
protected override void Dispose(bool disposing)
{
}
private sealed class Owner(byte[] array, int offset, int length) : IMemoryOwner
{
public Memory Memory { get; } = array.AsMemory(offset, length);
public void Dispose()
{
}
}
}
}