using System.Buffers; using System.IO.Pipelines; using System.Threading; namespace AyCode.Services.Server.Tests.SignalRs; /// /// Pipe-based test transport for AsyncSegment protocol path. /// /// Unlike (IBufferWriter only), this exposes a real /// , so AcBinaryHubProtocol.WriteMessage can enter /// AsyncSegment chunked mode (output is PipeWriter check). /// /// The internal uses a slab-like memory pool with fixed segment size, /// random offsets and size jitter to better simulate transport behavior. /// internal sealed class AsyncSegmentPipeTransportWriter : IDisposable { private readonly Pipe _pipe; private bool _disposed; private bool _writerCompleted; public AsyncSegmentPipeTransportWriter(int segmentSize = 256, int seed = 42) { if (segmentSize <= 0) throw new ArgumentOutOfRangeException(nameof(segmentSize)); _pipe = new Pipe(new PipeOptions( pool: new SlabSimulatingPool(segmentSize, seed), readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, pauseWriterThreshold: 0, resumeWriterThreshold: 0, minimumSegmentSize: segmentSize, useSynchronizationContext: false)); } /// /// Gets the PipeWriter that must be passed to protocol WriteMessage /// to activate AsyncSegment chunked write path. /// public PipeWriter Writer => _pipe.Writer; /// /// Gets the paired PipeReader for test-side inspection and parsing. /// public PipeReader Reader => _pipe.Reader; /// /// Completes only the writer side of the internal pipe. /// Reader remains open so tests can continue draining buffered data. /// public void CompleteWriter() { ObjectDisposedException.ThrowIf(_disposed, this); if (_writerCompleted) return; _writerCompleted = true; _pipe.Writer.Complete(); } /// /// Drains all currently available bytes from the reader into a contiguous array. /// Does not require completing the writer. /// public byte[] DrainAvailableBytes() { ObjectDisposedException.ThrowIf(_disposed, this); var output = new ArrayBufferWriter(); while (_pipe.Reader.TryRead(out var result)) { var buffer = result.Buffer; foreach (var segment in buffer) output.Write(segment.Span); _pipe.Reader.AdvanceTo(buffer.End); if (result.IsCompleted) break; } return output.WrittenSpan.ToArray(); } /// /// Asynchronously drains all data until the writer is completed and the pipe is exhausted. /// public async Task DrainAllAsync(CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_disposed, this); var output = new ArrayBufferWriter(); while (true) { var result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); var buffer = result.Buffer; foreach (var segment in buffer) output.Write(segment.Span); _pipe.Reader.AdvanceTo(buffer.End); if (result.IsCompleted) break; } return output.WrittenSpan.ToArray(); } /// /// Completes writer and reader sides of the internal pipe. /// public void Dispose() { if (_disposed) return; _disposed = true; if (!_writerCompleted) { _writerCompleted = true; _pipe.Writer.Complete(); } _pipe.Reader.Complete(); } private sealed class SlabSimulatingPool : MemoryPool { private readonly int _segmentSize; private readonly Random _rng; public SlabSimulatingPool(int segmentSize, int seed) { _segmentSize = segmentSize; _rng = new Random(seed); } public override int MaxBufferSize => _segmentSize; public override IMemoryOwner Rent(int minBufferSize = -1) { var requested = minBufferSize > 0 ? minBufferSize : _segmentSize; var size = Math.Max(requested, _segmentSize); var offset = _rng.Next(0, Math.Max(1, _segmentSize)); var jitter = _rng.Next(-_segmentSize / 4, _segmentSize / 4 + 1); var actualSize = Math.Max(16, size + jitter); var array = new byte[actualSize + offset]; return new Owner(array, offset, actualSize); } protected override void Dispose(bool disposing) { } private sealed class Owner(byte[] array, int offset, int length) : IMemoryOwner { public Memory Memory { get; } = array.AsMemory(offset, length); public void Dispose() { } } } }