diff --git a/.claude/settings.local.json b/.claude/settings.local.json
index df63b95..45c735b 100644
--- a/.claude/settings.local.json
+++ b/.claude/settings.local.json
@@ -52,7 +52,11 @@
"Bash(dotnet --version)",
"WebSearch",
"Bash(dotnet script:*)",
- "Bash(xargs wc:*)"
+ "Bash(xargs wc:*)",
+ "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\AyCode.Core.csproj\" --no-restore 2>&1)",
+ "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services\\\\AyCode.Services.csproj\" --no-restore 2>&1)",
+ "PowerShell(Remove-Item \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\Serializers\\\\Binaries\\\\PipeReaderBinaryInput.cs\" -Confirm:$false)",
+ "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services.Server.Tests\\\\AyCode.Services.Server.Tests.csproj\" --no-restore 2>&1)"
]
}
}
diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
index 224cad1..a1c8128 100644
--- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
+++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
@@ -292,17 +292,13 @@ public static partial class AcBinaryDeserializer
}
///
- /// Deserialize from PipeReader with segment streaming (read per chunk via PipeReaderBinaryInput).
- /// Data is consumed as it arrives from the network, enabling pipeline parallelism.
+ /// Deserialize from a with streaming pipeline parallelism.
+ /// The producer thread writes chunk data via ,
+ /// while this method (running on a background thread) deserializes incrementally,
+ /// blocking on when data is exhausted.
///
- public static T? Deserialize(System.IO.Pipelines.PipeReader pipeReader, AcBinarySerializerOptions options)
- => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), typeof(T), options);
-
- ///
- /// Deserialize from PipeReader to specified type with segment streaming.
- ///
- public static object? Deserialize(System.IO.Pipelines.PipeReader pipeReader, Type targetType, AcBinarySerializerOptions options)
- => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), targetType, options);
+ public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options)
+ => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options);
///
/// Internal: Deserialize with any TInput (multi-segment or other future input types).
diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs
deleted file mode 100644
index 516e35d..0000000
--- a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs
+++ /dev/null
@@ -1,236 +0,0 @@
-using System;
-using System.Buffers;
-using System.IO.Pipelines;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-namespace AyCode.Core.Serializers.Binaries;
-
-///
-/// Binary input that reads from a PipeReader, requesting more data when the current buffer is exhausted.
-///
-/// Mirrors SequenceBinaryInput's segment iteration and cross-boundary handling, but instead of
-/// reading from a fixed ReadOnlySequence, it calls PipeReader.ReadAsync() to get more data
-/// as chunks arrive from the network.
-///
-/// When the writer flushes per chunk (AsyncPipeWriterOutput), ReadAsync() returns as soon as
-/// data is available — typically completing synchronously. This enables pipeline parallelism:
-/// serialization, network transfer, and deserialization overlap.
-///
-public struct PipeReaderBinaryInput : IBinaryInputBase
-{
- private readonly PipeReader _pipeReader;
- private ReadOnlySequence _currentBuffer;
- private SequencePosition _nextSegmentPosition;
- private SequencePosition _consumedUpTo;
- private bool _pipeCompleted;
- private bool _hasActiveRead;
-
- // Cross-boundary scratch — same pattern as SequenceBinaryInput
- private byte[]? _scratchBuffer;
- private bool _afterCrossBoundary;
- private byte[]? _savedBuffer;
- private int _savedPosition;
- private int _savedBufferLength;
-
- ///
- /// Synchronously gets the result of a PipeReader.ReadAsync ValueTask.
- /// Fast-path: if already completed (data in pipe), returns directly without Task allocation.
- /// Slow-path: converts to Task for proper blocking when waiting for writer.
- ///
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static ReadResult SyncReadResult(ValueTask vt)
- => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
-
- public PipeReaderBinaryInput(PipeReader pipeReader)
- {
- _pipeReader = pipeReader;
- _currentBuffer = default;
- _nextSegmentPosition = default;
- _consumedUpTo = default;
- _pipeCompleted = false;
- _hasActiveRead = false;
- _scratchBuffer = null;
- _afterCrossBoundary = false;
- _savedBuffer = null;
- _savedPosition = 0;
- _savedBufferLength = 0;
- }
-
- ///
- /// Reads the first data from the PipeReader and provides the first segment's buffer.
- ///
- public void Initialize(out byte[] buffer, out int position, out int bufferLength)
- {
- var result = SyncReadResult(_pipeReader.ReadAsync());
- _currentBuffer = result.Buffer;
- _pipeCompleted = result.IsCompleted;
- _hasActiveRead = true;
- _consumedUpTo = _currentBuffer.Start;
- _nextSegmentPosition = _currentBuffer.Start;
-
- if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0)
- throw new AcBinaryDeserializationException("Empty pipe — no data to read.");
-
- _consumedUpTo = _nextSegmentPosition; // Mark first segment as consumed (same as TryLoadNextSegmentFromBuffer)
- ExtractArray(memory, out buffer, out position, out bufferLength);
- }
-
- ///
- /// Advances to the next segment. If the current ReadResult buffer is exhausted,
- /// calls PipeReader.ReadAsync() to get more data from the pipe.
- ///
- [MethodImpl(MethodImplOptions.NoInlining)]
- public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
- {
- // After cross-boundary scratch read: restore to the last touched segment
- if (_afterCrossBoundary)
- {
- _afterCrossBoundary = false;
- buffer = _savedBuffer!;
- position = _savedPosition;
- bufferLength = _savedBufferLength;
-
- if (bufferLength - position >= needed)
- return true;
- }
-
- var remaining = bufferLength - position;
-
- if (remaining > 0 && remaining < needed)
- return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
-
- // Try next segment in current buffer
- if (TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength))
- {
- remaining = bufferLength - position;
- if (remaining >= needed) return true;
- return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
- }
-
- // Current buffer exhausted — request more data from PipeReader
- if (_pipeCompleted)
- return false;
-
- _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
- _hasActiveRead = false;
-
- var result = SyncReadResult(_pipeReader.ReadAsync());
- _currentBuffer = result.Buffer;
- _pipeCompleted = result.IsCompleted;
- _hasActiveRead = true;
- _consumedUpTo = _currentBuffer.Start;
- _nextSegmentPosition = _currentBuffer.Start;
-
- if (!TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength))
- return false;
-
- remaining = bufferLength - position;
- if (remaining >= needed) return true;
- return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
- }
-
- ///
- /// Returns the scratch buffer and signals the PipeReader that all data has been consumed.
- ///
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void Release()
- {
- if (_scratchBuffer != null)
- {
- ArrayPool.Shared.Return(_scratchBuffer);
- _scratchBuffer = null;
- }
-
- if (_hasActiveRead)
- {
- _pipeReader.AdvanceTo(_currentBuffer.End);
- _hasActiveRead = false;
- }
- }
-
- private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength)
- {
- if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0)
- return false;
-
- ExtractArray(memory, out buffer, out position, out bufferLength);
- _consumedUpTo = _nextSegmentPosition;
- return true;
- }
-
- private bool TryReadCrossBoundary(ref byte[] buffer, ref int position, ref int bufferLength, int needed, int remaining)
- {
- if (_scratchBuffer == null || _scratchBuffer.Length < needed)
- {
- if (_scratchBuffer != null)
- ArrayPool.Shared.Return(_scratchBuffer);
- _scratchBuffer = ArrayPool.Shared.Rent(needed);
- }
-
- // 1) Copy tail of current segment
- Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining);
- var filled = remaining;
-
- // 2) Copy from subsequent segments (may span multiple segments or pipe reads)
- while (filled < needed)
- {
- // Try next segment in current buffer
- if (_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) && memory.Length > 0)
- {
- ExtractArray(memory, out var segArray, out var segOffset, out var segBufferLength);
- _consumedUpTo = _nextSegmentPosition;
- var segCount = segBufferLength - segOffset;
- var take = Math.Min(needed - filled, segCount);
- Buffer.BlockCopy(segArray, segOffset, _scratchBuffer, filled, take);
- filled += take;
-
- _savedBuffer = segArray;
- _savedPosition = segOffset + take;
- _savedBufferLength = segBufferLength;
- continue;
- }
-
- // Current buffer exhausted — read more from pipe
- if (_pipeCompleted)
- return false;
-
- _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
- _hasActiveRead = false;
- var result = SyncReadResult(_pipeReader.ReadAsync());
- _currentBuffer = result.Buffer;
- _pipeCompleted = result.IsCompleted;
- _hasActiveRead = true;
- _consumedUpTo = _currentBuffer.Start;
- _nextSegmentPosition = _currentBuffer.Start;
-
- if (_currentBuffer.IsEmpty && _pipeCompleted)
- return false;
- }
-
- buffer = _scratchBuffer;
- position = 0;
- bufferLength = filled;
- _afterCrossBoundary = true;
- return true;
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static void ExtractArray(ReadOnlyMemory memory, out byte[] buffer, out int position, out int bufferLength)
- {
- if (MemoryMarshal.TryGetArray(memory, out var segment))
- {
- buffer = segment.Array!;
- position = segment.Offset;
- bufferLength = segment.Offset + segment.Count;
- }
- else
- {
- var temp = new byte[memory.Length];
- memory.Span.CopyTo(temp);
- buffer = temp;
- position = 0;
- bufferLength = temp.Length;
- }
- }
-}
diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs
new file mode 100644
index 0000000..d78f3b7
--- /dev/null
+++ b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs
@@ -0,0 +1,226 @@
+using System;
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+using Microsoft.Extensions.Logging;
+
+namespace AyCode.Core.Serializers.Binaries;
+
+///
+/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
+///
+/// Replaces for the AsyncSegment read path:
+/// the main thread writes incoming chunk data via , while a background
+/// deserialization thread reads through which blocks
+/// on when data is exhausted.
+///
+/// Backed by a single contiguous byte[] from .
+/// Positions reset to 0 when the consumer catches up (zero-cost reuse).
+/// Grow is the absolute last resort (single Rent, practically never happens).
+///
+/// Thread-safety:
+///
+/// - _writePos: written by producer (Volatile.Write), read by consumer (Volatile.Read).
+/// - _readPos: written by consumer (Volatile.Write), read by producer (Volatile.Read).
+/// - Reset-to-0 happens in only when _readPos == _writePos
+/// (consumer is blocked in TryAdvanceSegment, not reading the buffer).
+/// - Grow happens in only when reset is insufficient
+/// (consumer is behind). Old buffer kept for consumer's local reference;
+/// picks up new buffer.
+///
+///
+public sealed class SegmentBufferReader : IDisposable
+{
+ private byte[] _buffer;
+ private int _writePos;
+ private int _readPos; // consumer reports consumed position here
+ private bool _completed;
+
+ private readonly ManualResetEventSlim _dataAvailable;
+ private readonly ILogger? _logger;
+
+ // After grow: ALL old buffers are kept alive until Dispose.
+ // Cannot return them to the pool mid-operation because the consumer thread
+ // may hold a local reference to any of them (its local 'buffer' variable is
+ // only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind).
+ private byte[][]? _oldBuffers;
+ private int _oldBufferCount;
+
+ ///
+ /// Creates a new SegmentBufferReader with the specified initial capacity.
+ /// Typical value: chunkSize * 2 (e.g. 8192 for 4096-byte chunks).
+ ///
+ /// Initial buffer size. Rounded up by ArrayPool.
+ /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds.
+ public SegmentBufferReader(int initialCapacity, ILogger? logger = null)
+ {
+ if (initialCapacity <= 0)
+ throw new ArgumentOutOfRangeException(nameof(initialCapacity));
+
+ _buffer = ArrayPool.Shared.Rent(initialCapacity);
+ _dataAvailable = new ManualResetEventSlim(false);
+ _logger = logger;
+ }
+
+ // --- Producer API (main thread) ---
+
+ ///
+ /// Appends chunk data to the buffer.
+ /// Resets positions to 0 when consumer has caught up (zero-cost).
+ /// Grows as last resort if consumer is behind and buffer is full.
+ /// Signals the consumer thread that new data is available.
+ ///
+ public void Write(ReadOnlySpan data)
+ {
+ if (data.IsEmpty) return;
+
+ // If consumer consumed everything → reset positions to 0 (zero-cost reuse)
+ var rp = Volatile.Read(ref _readPos);
+ if (rp > 0 && rp == _writePos)
+ {
+ DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos);
+ _writePos = 0;
+ Volatile.Write(ref _readPos, 0);
+ }
+
+ // Grow if buffer can't fit the new data (rare — consumer typically keeps pace)
+ if (_writePos + data.Length > _buffer.Length)
+ {
+ DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}",
+ _writePos, data.Length, _buffer.Length);
+ Grow(_writePos + data.Length);
+ }
+
+ data.CopyTo(_buffer.AsSpan(_writePos));
+ var newWritePos = _writePos + data.Length;
+ Volatile.Write(ref _writePos, newWritePos);
+ _dataAvailable.Set();
+
+ DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}",
+ data.Length, newWritePos, Volatile.Read(ref _readPos));
+ }
+
+ ///
+ /// Signals that no more data will be written (CHUNK_END received).
+ /// The consumer's will return false
+ /// once all buffered data is consumed.
+ ///
+ public void Complete()
+ {
+ Volatile.Write(ref _completed, true);
+ _dataAvailable.Set();
+
+ DebugLog("Complete writePos={Wp} readPos={Rp}",
+ Volatile.Read(ref _writePos), Volatile.Read(ref _readPos));
+ }
+
+ // --- Consumer API (deser thread, used by SegmentBufferReaderInput) ---
+
+ /// Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment.
+ internal byte[] Buffer => _buffer;
+
+ /// Current write position. All bytes in [ReadPos..WritePos) are valid.
+ internal int WritePos => Volatile.Read(ref _writePos);
+
+ /// Consumer's last reported read position.
+ internal int ReadPos => Volatile.Read(ref _readPos);
+
+ /// True after is called.
+ internal bool IsCompleted => Volatile.Read(ref _completed);
+
+ ///
+ /// Called by consumer to report how far it has read.
+ /// Enables the producer to reset positions to 0 when everything is consumed.
+ ///
+ internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position);
+
+ /// Blocks until new data is written or is called.
+ internal void WaitForData() => _dataAvailable.Wait();
+
+ ///
+ /// Resets the signal for the double-check pattern:
+ /// ResetSignal() → check condition → if false, WaitForData().
+ ///
+ internal void ResetSignal() => _dataAvailable.Reset();
+
+ // --- Lifecycle ---
+
+ public void Dispose()
+ {
+ // Return all old buffers accumulated from grows
+ if (_oldBuffers != null)
+ {
+ for (var i = 0; i < _oldBufferCount; i++)
+ {
+ ArrayPool.Shared.Return(_oldBuffers[i]);
+ _oldBuffers[i] = null!;
+ }
+ _oldBuffers = null;
+ _oldBufferCount = 0;
+ }
+
+ // Return current buffer
+ if (_buffer != null!)
+ {
+ ArrayPool.Shared.Return(_buffer);
+ _buffer = null!;
+ }
+
+ _dataAvailable.Dispose();
+ }
+
+ // --- Internal ---
+
+ private void Grow(int requiredCapacity)
+ {
+ var newSize = Math.Max(_buffer.Length * 2, requiredCapacity);
+ var newBuffer = ArrayPool.Shared.Rent(newSize);
+ System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos);
+
+ // Keep the current buffer alive — consumer's local 'buffer' variable may still reference it
+ // (consumer may lag multiple grows behind before calling TryAdvanceSegment).
+ // Returning old buffers to the pool mid-operation would cause use-after-free
+ // if another pool user overwrites them while the consumer is still reading.
+ if (_oldBuffers == null)
+ _oldBuffers = new byte[4][];
+ else if (_oldBufferCount == _oldBuffers.Length)
+ Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2);
+
+ _oldBuffers[_oldBufferCount++] = _buffer;
+ _buffer = newBuffer;
+ }
+
+ // --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) ---
+
+ ///
+ /// Emits a debug log if the logger is attached and Debug level is enabled.
+ /// Compiled out entirely in RELEASE builds via .
+ ///
+ [Conditional("DEBUG")]
+ internal void DebugLog(string message)
+ {
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("SegmentBufferReader " + message);
+ }
+
+ [Conditional("DEBUG")]
+ private void DebugLog(string template, object? arg0)
+ {
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("SegmentBufferReader " + template, arg0);
+ }
+
+ [Conditional("DEBUG")]
+ private void DebugLog(string template, object? arg0, object? arg1)
+ {
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1);
+ }
+
+ [Conditional("DEBUG")]
+ private void DebugLog(string template, object? arg0, object? arg1, object? arg2)
+ {
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2);
+ }
+}
diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs
new file mode 100644
index 0000000..b42fe7b
--- /dev/null
+++ b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs
@@ -0,0 +1,120 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace AyCode.Core.Serializers.Binaries;
+
+///
+/// Binary input that reads from a for chunked streaming deserialization.
+///
+/// Replaces PipeReaderBinaryInput: instead of blocking on PipeReader.ReadAsync(),
+/// blocks on when data is exhausted. Much simpler because
+/// the buffer is a single contiguous byte[] — no multi-segment iteration, no cross-boundary
+/// scratch buffers.
+///
+/// The deserialization context's hot path reads directly from the buffer array using local
+/// buffer/position/bufferLength variables.
+/// is only called when position >= bufferLength (cold path), at which point it reports
+/// the consumed position via , then either
+/// provides more data or blocks until data arrives.
+///
+/// Position reset: when the producer detects readPos == writePos (all consumed),
+/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions.
+///
+public struct SegmentBufferReaderInput : IBinaryInputBase
+{
+ private readonly SegmentBufferReader _reader;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public SegmentBufferReaderInput(SegmentBufferReader reader)
+ {
+ _reader = reader;
+ }
+
+ ///
+ /// Provides the initial buffer state. Called once before deserialization begins.
+ /// Task.Run starts after the first Write() (lazy start in TryParseChunkData),
+ /// so data is already available — no wait needed.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void Initialize(out byte[] buffer, out int position, out int bufferLength)
+ {
+ buffer = _reader.Buffer;
+ position = 0;
+ bufferLength = _reader.WritePos;
+ DebugLog("Input.Initialize bufferLength=" + bufferLength);
+ }
+
+ ///
+ /// Called when the deserialization context needs more bytes than currently available.
+ /// Reports consumed position to the producer, then blocks via
+ /// until enough data arrives or completion is signaled.
+ ///
+ /// Uses the double-check pattern to avoid missed signals:
+ /// Reset() → check → if still not enough, Wait().
+ ///
+ /// No cross-boundary handling needed — the buffer is a single contiguous byte[].
+ /// After grow, re-reads _reader.Buffer to get the new (larger) array.
+ /// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions.
+ ///
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
+ {
+ DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed);
+
+ // Report how far we've consumed — enables producer to reset positions to 0
+ _reader.SetReadPos(position);
+
+ while (true)
+ {
+ // Re-read positions (may have been reset to 0 by producer)
+ int rp = _reader.ReadPos;
+ int wp = _reader.WritePos;
+
+ if (wp - rp >= needed)
+ {
+ buffer = _reader.Buffer; // may be new array after grow
+ position = rp; // may be 0 after reset
+ bufferLength = wp;
+ DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength);
+ return true;
+ }
+
+ if (_reader.IsCompleted)
+ {
+ // No more data will arrive. Return whatever is left.
+ if (wp > rp)
+ {
+ buffer = _reader.Buffer;
+ position = rp;
+ bufferLength = wp;
+ DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength);
+ return true;
+ }
+ DebugLog("Input.TryAdvanceSegment return false (completed, empty)");
+ return false; // end of input
+ }
+
+ // Double-check pattern: Reset → verify → Wait
+ _reader.ResetSignal();
+
+ rp = _reader.ReadPos;
+ wp = _reader.WritePos;
+ if (wp - rp >= needed || _reader.IsCompleted)
+ continue; // re-check from top
+
+ DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")");
+ _reader.WaitForData(); // ManualResetEventSlim.Wait()
+ DebugLog("Input.TryAdvanceSegment woke up");
+ }
+ }
+
+ [Conditional("DEBUG")]
+ private void DebugLog(string message) => _reader.DebugLog(message);
+
+ ///
+ /// No-op. Buffer lifecycle is managed by .
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void Release() { }
+}
diff --git a/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs b/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs
new file mode 100644
index 0000000..6b03b58
--- /dev/null
+++ b/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs
@@ -0,0 +1,171 @@
+using System.Buffers;
+using System.IO.Pipelines;
+using System.Threading;
+
+namespace AyCode.Services.Server.Tests.SignalRs;
+
+///
+/// Pipe-based test transport for AsyncSegment protocol path.
+///
+/// Unlike (IBufferWriter only), this exposes a real
+/// , so AcBinaryHubProtocol.WriteMessage can enter
+/// AsyncSegment chunked mode (output is PipeWriter check).
+///
+/// The internal uses a slab-like memory pool with fixed segment size,
+/// random offsets and size jitter to better simulate transport behavior.
+///
+internal sealed class AsyncSegmentPipeTransportWriter : IDisposable
+{
+ private readonly Pipe _pipe;
+ private bool _disposed;
+ private bool _writerCompleted;
+
+ public AsyncSegmentPipeTransportWriter(int segmentSize = 256, int seed = 42)
+ {
+ if (segmentSize <= 0)
+ throw new ArgumentOutOfRangeException(nameof(segmentSize));
+
+ _pipe = new Pipe(new PipeOptions(
+ pool: new SlabSimulatingPool(segmentSize, seed),
+ readerScheduler: PipeScheduler.Inline,
+ writerScheduler: PipeScheduler.Inline,
+ pauseWriterThreshold: 0,
+ resumeWriterThreshold: 0,
+ minimumSegmentSize: segmentSize,
+ useSynchronizationContext: false));
+ }
+
+ ///
+ /// Gets the PipeWriter that must be passed to protocol WriteMessage
+ /// to activate AsyncSegment chunked write path.
+ ///
+ public PipeWriter Writer => _pipe.Writer;
+
+ ///
+ /// Gets the paired PipeReader for test-side inspection and parsing.
+ ///
+ public PipeReader Reader => _pipe.Reader;
+
+ ///
+ /// Completes only the writer side of the internal pipe.
+ /// Reader remains open so tests can continue draining buffered data.
+ ///
+ public void CompleteWriter()
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ if (_writerCompleted)
+ return;
+
+ _writerCompleted = true;
+ _pipe.Writer.Complete();
+ }
+
+ ///
+ /// Drains all currently available bytes from the reader into a contiguous array.
+ /// Does not require completing the writer.
+ ///
+ public byte[] DrainAvailableBytes()
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ var output = new ArrayBufferWriter();
+
+ while (_pipe.Reader.TryRead(out var result))
+ {
+ var buffer = result.Buffer;
+ foreach (var segment in buffer)
+ output.Write(segment.Span);
+
+ _pipe.Reader.AdvanceTo(buffer.End);
+
+ if (result.IsCompleted)
+ break;
+ }
+
+ return output.WrittenSpan.ToArray();
+ }
+
+ ///
+ /// Asynchronously drains all data until the writer is completed and the pipe is exhausted.
+ ///
+ public async Task DrainAllAsync(CancellationToken cancellationToken = default)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ var output = new ArrayBufferWriter();
+
+ while (true)
+ {
+ var result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+ var buffer = result.Buffer;
+
+ foreach (var segment in buffer)
+ output.Write(segment.Span);
+
+ _pipe.Reader.AdvanceTo(buffer.End);
+
+ if (result.IsCompleted)
+ break;
+ }
+
+ return output.WrittenSpan.ToArray();
+ }
+
+ ///
+ /// Completes writer and reader sides of the internal pipe.
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+ if (!_writerCompleted)
+ {
+ _writerCompleted = true;
+ _pipe.Writer.Complete();
+ }
+ _pipe.Reader.Complete();
+ }
+
+ private sealed class SlabSimulatingPool : MemoryPool
+ {
+ private readonly int _segmentSize;
+ private readonly Random _rng;
+
+ public SlabSimulatingPool(int segmentSize, int seed)
+ {
+ _segmentSize = segmentSize;
+ _rng = new Random(seed);
+ }
+
+ public override int MaxBufferSize => _segmentSize;
+
+ public override IMemoryOwner Rent(int minBufferSize = -1)
+ {
+ var requested = minBufferSize > 0 ? minBufferSize : _segmentSize;
+ var size = Math.Max(requested, _segmentSize);
+
+ var offset = _rng.Next(0, Math.Max(1, _segmentSize));
+ var jitter = _rng.Next(-_segmentSize / 4, _segmentSize / 4 + 1);
+ var actualSize = Math.Max(16, size + jitter);
+
+ var array = new byte[actualSize + offset];
+ return new Owner(array, offset, actualSize);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ }
+
+ private sealed class Owner(byte[] array, int offset, int length) : IMemoryOwner
+ {
+ public Memory Memory { get; } = array.AsMemory(offset, length);
+
+ public void Dispose()
+ {
+ }
+ }
+ }
+}
diff --git a/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs b/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs
index e0dc280..cf2a211 100644
--- a/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs
+++ b/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs
@@ -1,6 +1,7 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
+using AyCode.Core.Serializers.Binaries;
using AyCode.Services.SignalRs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
@@ -25,9 +26,12 @@ namespace AyCode.Services.Server.Tests.SignalRs;
internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
{
private const int SegmentSize = 256;
+ private readonly BinaryProtocolMode _mode;
- public TestMultiSegmentProtocol()
+ public TestMultiSegmentProtocol(BinaryProtocolMode mode = BinaryProtocolMode.Bytes)
+ : base(AcBinarySerializerOptions.Default, mode)
{
+ _mode = mode;
Options.BufferWriterChunkSize = SegmentSize;
}
@@ -38,6 +42,9 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
///
public ReadOnlyMemory GetMessageBytesMultiSegment(HubMessage message)
{
+ if (_mode == BinaryProtocolMode.AsyncSegment)
+ return GetMessageBytesAsyncSegment(message);
+
// ── Transport-double path (production simulation) ──────────────────
var transport = new SlabTransportWriter(SegmentSize);
WriteMessage(message, transport);
@@ -58,6 +65,18 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
return transportBytes;
}
+ ///
+ /// AsyncSegment write side: uses a real PipeWriter transport so chunked protocol path activates.
+ ///
+ private ReadOnlyMemory GetMessageBytesAsyncSegment(HubMessage message)
+ {
+ using var transport = new AsyncSegmentPipeTransportWriter(SegmentSize);
+ WriteMessage(message, transport.Writer);
+ transport.CompleteWriter();
+ var bytes = transport.DrainAllAsync().GetAwaiter().GetResult();
+ return bytes;
+ }
+
///
/// Read side: fill Pipe with 256-byte slab segments → multi-segment ReadOnlySequence.
/// Same as production Kestrel PipeReader delivering slab-sized segments.
diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs
index d0264e0..e4d6f11 100644
--- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs
+++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs
@@ -19,18 +19,19 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ
{
private HubConnectionState _connectionState = HubConnectionState.Connected;
private readonly TestableSignalRHub2 _signalRHub;
- private readonly TestMultiSegmentProtocol _protocol = new();
+ private readonly TestMultiSegmentProtocol _protocol;
private readonly TestInvocationBinder _binder = new();
///
/// Testable SignalR client that allows testing without real HubConnection.
///
- public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger) : base(logger)
+ public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger, BinaryProtocolMode mode = BinaryProtocolMode.Bytes) : base(logger)
{
MsDelay = 0;
MsFirstDelay = 0;
_signalRHub = signalRHub;
+ _protocol = new TestMultiSegmentProtocol(mode);
}
#region Override virtual methods for testing
diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs
index 02ee199..d93695c 100644
--- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs
+++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs
@@ -22,7 +22,7 @@ namespace AyCode.Services.Server.Tests.SignalRs;
public class TestableSignalRHub2 : AcWebSignalRHubBase
{
private IAcSignalRHubItemServer _callerClient;
- private readonly TestMultiSegmentProtocol _protocol = new();
+ private readonly TestMultiSegmentProtocol _protocol;
private readonly TestInvocationBinder _binder = new();
#region Test Configuration
@@ -49,14 +49,16 @@ public class TestableSignalRHub2 : AcWebSignalRHubBasePer-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
- private readonly ConditionalWeakTable? _chunkStates;
+ ///
+ /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
+ /// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server.
+ ///
+ private readonly ConditionalWeakTable _chunkStates;
private sealed class AsyncChunkState
{
@@ -69,8 +72,16 @@ public class AcBinaryHubProtocol : IHubProtocol
public object?[] Args = null!;
public int StreamedArgIndex;
public Type StreamedArgType = null!;
- public Pipe InternalPipe = null!;
+ public SegmentBufferReader Buffer = null!;
public Task