From 4343ab4d537d2a6f1d367e86bb38f3d59de41629 Mon Sep 17 00:00:00 2001 From: Loretta Date: Sat, 18 Apr 2026 14:31:27 +0200 Subject: [PATCH] [LOADED_DOCS: .github\copilot-instructions.md] Refactor: replace PipeReaderBinaryInput with SegmentBufferReader - Remove PipeReaderBinaryInput and all related code. - Add SegmentBufferReader and SegmentBufferReaderInput for efficient, thread-safe chunked streaming deserialization. - Update AcBinaryDeserializer and AcBinaryHubProtocol to use the new buffer for async segment protocol, improving state management and background deserialization. - Enhance chunked protocol handling: skip re-presented chunk start frames, track consumed chunk frame bytes, and improve logging. - Update test infrastructure to support async segment protocol and add AsyncSegmentPipeTransportWriter for realistic testing. - Update settings.local.json to reflect new build/test commands and remove obsolete files. - Improves performance, reliability, and testability of chunked SignalR streaming. --- .claude/settings.local.json | 6 +- .../Binaries/AcBinaryDeserializer.cs | 16 +- .../Binaries/PipeReaderBinaryInput.cs | 236 ------------------ .../Binaries/SegmentBufferReader.cs | 226 +++++++++++++++++ .../Binaries/SegmentBufferReaderInput.cs | 120 +++++++++ .../AsyncSegmentPipeTransportWriter.cs | 171 +++++++++++++ .../SignalRs/TestMultiSegmentProtocol.cs | 21 +- .../SignalRs/TestableSignalRClient2.cs | 5 +- .../SignalRs/TestableSignalRHub2.cs | 8 +- .../SignalRs/AcBinaryHubProtocol.cs | 166 +++++++++--- 10 files changed, 687 insertions(+), 288 deletions(-) delete mode 100644 AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs create mode 100644 AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs create mode 100644 AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs create mode 100644 AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs diff --git a/.claude/settings.local.json b/.claude/settings.local.json index df63b95..45c735b 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -52,7 +52,11 @@ "Bash(dotnet --version)", "WebSearch", "Bash(dotnet script:*)", - "Bash(xargs wc:*)" + "Bash(xargs wc:*)", + "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\AyCode.Core.csproj\" --no-restore 2>&1)", + "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services\\\\AyCode.Services.csproj\" --no-restore 2>&1)", + "PowerShell(Remove-Item \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\Serializers\\\\Binaries\\\\PipeReaderBinaryInput.cs\" -Confirm:$false)", + "PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services.Server.Tests\\\\AyCode.Services.Server.Tests.csproj\" --no-restore 2>&1)" ] } } diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index 224cad1..a1c8128 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -292,17 +292,13 @@ public static partial class AcBinaryDeserializer } /// - /// Deserialize from PipeReader with segment streaming (read per chunk via PipeReaderBinaryInput). - /// Data is consumed as it arrives from the network, enabling pipeline parallelism. + /// Deserialize from a with streaming pipeline parallelism. + /// The producer thread writes chunk data via , + /// while this method (running on a background thread) deserializes incrementally, + /// blocking on when data is exhausted. /// - public static T? Deserialize(System.IO.Pipelines.PipeReader pipeReader, AcBinarySerializerOptions options) - => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), typeof(T), options); - - /// - /// Deserialize from PipeReader to specified type with segment streaming. - /// - public static object? Deserialize(System.IO.Pipelines.PipeReader pipeReader, Type targetType, AcBinarySerializerOptions options) - => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), targetType, options); + public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options) + => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options); /// /// Internal: Deserialize with any TInput (multi-segment or other future input types). diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs deleted file mode 100644 index 516e35d..0000000 --- a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs +++ /dev/null @@ -1,236 +0,0 @@ -using System; -using System.Buffers; -using System.IO.Pipelines; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -namespace AyCode.Core.Serializers.Binaries; - -/// -/// Binary input that reads from a PipeReader, requesting more data when the current buffer is exhausted. -/// -/// Mirrors SequenceBinaryInput's segment iteration and cross-boundary handling, but instead of -/// reading from a fixed ReadOnlySequence, it calls PipeReader.ReadAsync() to get more data -/// as chunks arrive from the network. -/// -/// When the writer flushes per chunk (AsyncPipeWriterOutput), ReadAsync() returns as soon as -/// data is available — typically completing synchronously. This enables pipeline parallelism: -/// serialization, network transfer, and deserialization overlap. -/// -public struct PipeReaderBinaryInput : IBinaryInputBase -{ - private readonly PipeReader _pipeReader; - private ReadOnlySequence _currentBuffer; - private SequencePosition _nextSegmentPosition; - private SequencePosition _consumedUpTo; - private bool _pipeCompleted; - private bool _hasActiveRead; - - // Cross-boundary scratch — same pattern as SequenceBinaryInput - private byte[]? _scratchBuffer; - private bool _afterCrossBoundary; - private byte[]? _savedBuffer; - private int _savedPosition; - private int _savedBufferLength; - - /// - /// Synchronously gets the result of a PipeReader.ReadAsync ValueTask. - /// Fast-path: if already completed (data in pipe), returns directly without Task allocation. - /// Slow-path: converts to Task for proper blocking when waiting for writer. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static ReadResult SyncReadResult(ValueTask vt) - => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult(); - - public PipeReaderBinaryInput(PipeReader pipeReader) - { - _pipeReader = pipeReader; - _currentBuffer = default; - _nextSegmentPosition = default; - _consumedUpTo = default; - _pipeCompleted = false; - _hasActiveRead = false; - _scratchBuffer = null; - _afterCrossBoundary = false; - _savedBuffer = null; - _savedPosition = 0; - _savedBufferLength = 0; - } - - /// - /// Reads the first data from the PipeReader and provides the first segment's buffer. - /// - public void Initialize(out byte[] buffer, out int position, out int bufferLength) - { - var result = SyncReadResult(_pipeReader.ReadAsync()); - _currentBuffer = result.Buffer; - _pipeCompleted = result.IsCompleted; - _hasActiveRead = true; - _consumedUpTo = _currentBuffer.Start; - _nextSegmentPosition = _currentBuffer.Start; - - if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) - throw new AcBinaryDeserializationException("Empty pipe — no data to read."); - - _consumedUpTo = _nextSegmentPosition; // Mark first segment as consumed (same as TryLoadNextSegmentFromBuffer) - ExtractArray(memory, out buffer, out position, out bufferLength); - } - - /// - /// Advances to the next segment. If the current ReadResult buffer is exhausted, - /// calls PipeReader.ReadAsync() to get more data from the pipe. - /// - [MethodImpl(MethodImplOptions.NoInlining)] - public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) - { - // After cross-boundary scratch read: restore to the last touched segment - if (_afterCrossBoundary) - { - _afterCrossBoundary = false; - buffer = _savedBuffer!; - position = _savedPosition; - bufferLength = _savedBufferLength; - - if (bufferLength - position >= needed) - return true; - } - - var remaining = bufferLength - position; - - if (remaining > 0 && remaining < needed) - return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); - - // Try next segment in current buffer - if (TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength)) - { - remaining = bufferLength - position; - if (remaining >= needed) return true; - return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); - } - - // Current buffer exhausted — request more data from PipeReader - if (_pipeCompleted) - return false; - - _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); - _hasActiveRead = false; - - var result = SyncReadResult(_pipeReader.ReadAsync()); - _currentBuffer = result.Buffer; - _pipeCompleted = result.IsCompleted; - _hasActiveRead = true; - _consumedUpTo = _currentBuffer.Start; - _nextSegmentPosition = _currentBuffer.Start; - - if (!TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength)) - return false; - - remaining = bufferLength - position; - if (remaining >= needed) return true; - return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); - } - - /// - /// Returns the scratch buffer and signals the PipeReader that all data has been consumed. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Release() - { - if (_scratchBuffer != null) - { - ArrayPool.Shared.Return(_scratchBuffer); - _scratchBuffer = null; - } - - if (_hasActiveRead) - { - _pipeReader.AdvanceTo(_currentBuffer.End); - _hasActiveRead = false; - } - } - - private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength) - { - if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) - return false; - - ExtractArray(memory, out buffer, out position, out bufferLength); - _consumedUpTo = _nextSegmentPosition; - return true; - } - - private bool TryReadCrossBoundary(ref byte[] buffer, ref int position, ref int bufferLength, int needed, int remaining) - { - if (_scratchBuffer == null || _scratchBuffer.Length < needed) - { - if (_scratchBuffer != null) - ArrayPool.Shared.Return(_scratchBuffer); - _scratchBuffer = ArrayPool.Shared.Rent(needed); - } - - // 1) Copy tail of current segment - Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining); - var filled = remaining; - - // 2) Copy from subsequent segments (may span multiple segments or pipe reads) - while (filled < needed) - { - // Try next segment in current buffer - if (_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) && memory.Length > 0) - { - ExtractArray(memory, out var segArray, out var segOffset, out var segBufferLength); - _consumedUpTo = _nextSegmentPosition; - var segCount = segBufferLength - segOffset; - var take = Math.Min(needed - filled, segCount); - Buffer.BlockCopy(segArray, segOffset, _scratchBuffer, filled, take); - filled += take; - - _savedBuffer = segArray; - _savedPosition = segOffset + take; - _savedBufferLength = segBufferLength; - continue; - } - - // Current buffer exhausted — read more from pipe - if (_pipeCompleted) - return false; - - _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); - _hasActiveRead = false; - var result = SyncReadResult(_pipeReader.ReadAsync()); - _currentBuffer = result.Buffer; - _pipeCompleted = result.IsCompleted; - _hasActiveRead = true; - _consumedUpTo = _currentBuffer.Start; - _nextSegmentPosition = _currentBuffer.Start; - - if (_currentBuffer.IsEmpty && _pipeCompleted) - return false; - } - - buffer = _scratchBuffer; - position = 0; - bufferLength = filled; - _afterCrossBoundary = true; - return true; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void ExtractArray(ReadOnlyMemory memory, out byte[] buffer, out int position, out int bufferLength) - { - if (MemoryMarshal.TryGetArray(memory, out var segment)) - { - buffer = segment.Array!; - position = segment.Offset; - bufferLength = segment.Offset + segment.Count; - } - else - { - var temp = new byte[memory.Length]; - memory.Span.CopyTo(temp); - buffer = temp; - position = 0; - bufferLength = temp.Length; - } - } -} diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs new file mode 100644 index 0000000..d78f3b7 --- /dev/null +++ b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs @@ -0,0 +1,226 @@ +using System; +using System.Buffers; +using System.Diagnostics; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace AyCode.Core.Serializers.Binaries; + +/// +/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization. +/// +/// Replaces for the AsyncSegment read path: +/// the main thread writes incoming chunk data via , while a background +/// deserialization thread reads through which blocks +/// on when data is exhausted. +/// +/// Backed by a single contiguous byte[] from . +/// Positions reset to 0 when the consumer catches up (zero-cost reuse). +/// Grow is the absolute last resort (single Rent, practically never happens). +/// +/// Thread-safety: +/// +/// _writePos: written by producer (Volatile.Write), read by consumer (Volatile.Read). +/// _readPos: written by consumer (Volatile.Write), read by producer (Volatile.Read). +/// Reset-to-0 happens in only when _readPos == _writePos +/// (consumer is blocked in TryAdvanceSegment, not reading the buffer). +/// Grow happens in only when reset is insufficient +/// (consumer is behind). Old buffer kept for consumer's local reference; +/// picks up new buffer. +/// +/// +public sealed class SegmentBufferReader : IDisposable +{ + private byte[] _buffer; + private int _writePos; + private int _readPos; // consumer reports consumed position here + private bool _completed; + + private readonly ManualResetEventSlim _dataAvailable; + private readonly ILogger? _logger; + + // After grow: ALL old buffers are kept alive until Dispose. + // Cannot return them to the pool mid-operation because the consumer thread + // may hold a local reference to any of them (its local 'buffer' variable is + // only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind). + private byte[][]? _oldBuffers; + private int _oldBufferCount; + + /// + /// Creates a new SegmentBufferReader with the specified initial capacity. + /// Typical value: chunkSize * 2 (e.g. 8192 for 4096-byte chunks). + /// + /// Initial buffer size. Rounded up by ArrayPool. + /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds. + public SegmentBufferReader(int initialCapacity, ILogger? logger = null) + { + if (initialCapacity <= 0) + throw new ArgumentOutOfRangeException(nameof(initialCapacity)); + + _buffer = ArrayPool.Shared.Rent(initialCapacity); + _dataAvailable = new ManualResetEventSlim(false); + _logger = logger; + } + + // --- Producer API (main thread) --- + + /// + /// Appends chunk data to the buffer. + /// Resets positions to 0 when consumer has caught up (zero-cost). + /// Grows as last resort if consumer is behind and buffer is full. + /// Signals the consumer thread that new data is available. + /// + public void Write(ReadOnlySpan data) + { + if (data.IsEmpty) return; + + // If consumer consumed everything → reset positions to 0 (zero-cost reuse) + var rp = Volatile.Read(ref _readPos); + if (rp > 0 && rp == _writePos) + { + DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos); + _writePos = 0; + Volatile.Write(ref _readPos, 0); + } + + // Grow if buffer can't fit the new data (rare — consumer typically keeps pace) + if (_writePos + data.Length > _buffer.Length) + { + DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}", + _writePos, data.Length, _buffer.Length); + Grow(_writePos + data.Length); + } + + data.CopyTo(_buffer.AsSpan(_writePos)); + var newWritePos = _writePos + data.Length; + Volatile.Write(ref _writePos, newWritePos); + _dataAvailable.Set(); + + DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}", + data.Length, newWritePos, Volatile.Read(ref _readPos)); + } + + /// + /// Signals that no more data will be written (CHUNK_END received). + /// The consumer's will return false + /// once all buffered data is consumed. + /// + public void Complete() + { + Volatile.Write(ref _completed, true); + _dataAvailable.Set(); + + DebugLog("Complete writePos={Wp} readPos={Rp}", + Volatile.Read(ref _writePos), Volatile.Read(ref _readPos)); + } + + // --- Consumer API (deser thread, used by SegmentBufferReaderInput) --- + + /// Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment. + internal byte[] Buffer => _buffer; + + /// Current write position. All bytes in [ReadPos..WritePos) are valid. + internal int WritePos => Volatile.Read(ref _writePos); + + /// Consumer's last reported read position. + internal int ReadPos => Volatile.Read(ref _readPos); + + /// True after is called. + internal bool IsCompleted => Volatile.Read(ref _completed); + + /// + /// Called by consumer to report how far it has read. + /// Enables the producer to reset positions to 0 when everything is consumed. + /// + internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position); + + /// Blocks until new data is written or is called. + internal void WaitForData() => _dataAvailable.Wait(); + + /// + /// Resets the signal for the double-check pattern: + /// ResetSignal() → check condition → if false, WaitForData(). + /// + internal void ResetSignal() => _dataAvailable.Reset(); + + // --- Lifecycle --- + + public void Dispose() + { + // Return all old buffers accumulated from grows + if (_oldBuffers != null) + { + for (var i = 0; i < _oldBufferCount; i++) + { + ArrayPool.Shared.Return(_oldBuffers[i]); + _oldBuffers[i] = null!; + } + _oldBuffers = null; + _oldBufferCount = 0; + } + + // Return current buffer + if (_buffer != null!) + { + ArrayPool.Shared.Return(_buffer); + _buffer = null!; + } + + _dataAvailable.Dispose(); + } + + // --- Internal --- + + private void Grow(int requiredCapacity) + { + var newSize = Math.Max(_buffer.Length * 2, requiredCapacity); + var newBuffer = ArrayPool.Shared.Rent(newSize); + System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos); + + // Keep the current buffer alive — consumer's local 'buffer' variable may still reference it + // (consumer may lag multiple grows behind before calling TryAdvanceSegment). + // Returning old buffers to the pool mid-operation would cause use-after-free + // if another pool user overwrites them while the consumer is still reading. + if (_oldBuffers == null) + _oldBuffers = new byte[4][]; + else if (_oldBufferCount == _oldBuffers.Length) + Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2); + + _oldBuffers[_oldBufferCount++] = _buffer; + _buffer = newBuffer; + } + + // --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) --- + + /// + /// Emits a debug log if the logger is attached and Debug level is enabled. + /// Compiled out entirely in RELEASE builds via . + /// + [Conditional("DEBUG")] + internal void DebugLog(string message) + { + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("SegmentBufferReader " + message); + } + + [Conditional("DEBUG")] + private void DebugLog(string template, object? arg0) + { + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("SegmentBufferReader " + template, arg0); + } + + [Conditional("DEBUG")] + private void DebugLog(string template, object? arg0, object? arg1) + { + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1); + } + + [Conditional("DEBUG")] + private void DebugLog(string template, object? arg0, object? arg1, object? arg2) + { + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2); + } +} diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs new file mode 100644 index 0000000..b42fe7b --- /dev/null +++ b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs @@ -0,0 +1,120 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace AyCode.Core.Serializers.Binaries; + +/// +/// Binary input that reads from a for chunked streaming deserialization. +/// +/// Replaces PipeReaderBinaryInput: instead of blocking on PipeReader.ReadAsync(), +/// blocks on when data is exhausted. Much simpler because +/// the buffer is a single contiguous byte[] — no multi-segment iteration, no cross-boundary +/// scratch buffers. +/// +/// The deserialization context's hot path reads directly from the buffer array using local +/// buffer/position/bufferLength variables. +/// is only called when position >= bufferLength (cold path), at which point it reports +/// the consumed position via , then either +/// provides more data or blocks until data arrives. +/// +/// Position reset: when the producer detects readPos == writePos (all consumed), +/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions. +/// +public struct SegmentBufferReaderInput : IBinaryInputBase +{ + private readonly SegmentBufferReader _reader; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public SegmentBufferReaderInput(SegmentBufferReader reader) + { + _reader = reader; + } + + /// + /// Provides the initial buffer state. Called once before deserialization begins. + /// Task.Run starts after the first Write() (lazy start in TryParseChunkData), + /// so data is already available — no wait needed. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Initialize(out byte[] buffer, out int position, out int bufferLength) + { + buffer = _reader.Buffer; + position = 0; + bufferLength = _reader.WritePos; + DebugLog("Input.Initialize bufferLength=" + bufferLength); + } + + /// + /// Called when the deserialization context needs more bytes than currently available. + /// Reports consumed position to the producer, then blocks via + /// until enough data arrives or completion is signaled. + /// + /// Uses the double-check pattern to avoid missed signals: + /// Reset() → check → if still not enough, Wait(). + /// + /// No cross-boundary handling needed — the buffer is a single contiguous byte[]. + /// After grow, re-reads _reader.Buffer to get the new (larger) array. + /// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions. + /// + [MethodImpl(MethodImplOptions.NoInlining)] + public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) + { + DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed); + + // Report how far we've consumed — enables producer to reset positions to 0 + _reader.SetReadPos(position); + + while (true) + { + // Re-read positions (may have been reset to 0 by producer) + int rp = _reader.ReadPos; + int wp = _reader.WritePos; + + if (wp - rp >= needed) + { + buffer = _reader.Buffer; // may be new array after grow + position = rp; // may be 0 after reset + bufferLength = wp; + DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength); + return true; + } + + if (_reader.IsCompleted) + { + // No more data will arrive. Return whatever is left. + if (wp > rp) + { + buffer = _reader.Buffer; + position = rp; + bufferLength = wp; + DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength); + return true; + } + DebugLog("Input.TryAdvanceSegment return false (completed, empty)"); + return false; // end of input + } + + // Double-check pattern: Reset → verify → Wait + _reader.ResetSignal(); + + rp = _reader.ReadPos; + wp = _reader.WritePos; + if (wp - rp >= needed || _reader.IsCompleted) + continue; // re-check from top + + DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")"); + _reader.WaitForData(); // ManualResetEventSlim.Wait() + DebugLog("Input.TryAdvanceSegment woke up"); + } + } + + [Conditional("DEBUG")] + private void DebugLog(string message) => _reader.DebugLog(message); + + /// + /// No-op. Buffer lifecycle is managed by . + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Release() { } +} diff --git a/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs b/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs new file mode 100644 index 0000000..6b03b58 --- /dev/null +++ b/AyCode.Services.Server.Tests/SignalRs/AsyncSegmentPipeTransportWriter.cs @@ -0,0 +1,171 @@ +using System.Buffers; +using System.IO.Pipelines; +using System.Threading; + +namespace AyCode.Services.Server.Tests.SignalRs; + +/// +/// Pipe-based test transport for AsyncSegment protocol path. +/// +/// Unlike (IBufferWriter only), this exposes a real +/// , so AcBinaryHubProtocol.WriteMessage can enter +/// AsyncSegment chunked mode (output is PipeWriter check). +/// +/// The internal uses a slab-like memory pool with fixed segment size, +/// random offsets and size jitter to better simulate transport behavior. +/// +internal sealed class AsyncSegmentPipeTransportWriter : IDisposable +{ + private readonly Pipe _pipe; + private bool _disposed; + private bool _writerCompleted; + + public AsyncSegmentPipeTransportWriter(int segmentSize = 256, int seed = 42) + { + if (segmentSize <= 0) + throw new ArgumentOutOfRangeException(nameof(segmentSize)); + + _pipe = new Pipe(new PipeOptions( + pool: new SlabSimulatingPool(segmentSize, seed), + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 0, + resumeWriterThreshold: 0, + minimumSegmentSize: segmentSize, + useSynchronizationContext: false)); + } + + /// + /// Gets the PipeWriter that must be passed to protocol WriteMessage + /// to activate AsyncSegment chunked write path. + /// + public PipeWriter Writer => _pipe.Writer; + + /// + /// Gets the paired PipeReader for test-side inspection and parsing. + /// + public PipeReader Reader => _pipe.Reader; + + /// + /// Completes only the writer side of the internal pipe. + /// Reader remains open so tests can continue draining buffered data. + /// + public void CompleteWriter() + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (_writerCompleted) + return; + + _writerCompleted = true; + _pipe.Writer.Complete(); + } + + /// + /// Drains all currently available bytes from the reader into a contiguous array. + /// Does not require completing the writer. + /// + public byte[] DrainAvailableBytes() + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var output = new ArrayBufferWriter(); + + while (_pipe.Reader.TryRead(out var result)) + { + var buffer = result.Buffer; + foreach (var segment in buffer) + output.Write(segment.Span); + + _pipe.Reader.AdvanceTo(buffer.End); + + if (result.IsCompleted) + break; + } + + return output.WrittenSpan.ToArray(); + } + + /// + /// Asynchronously drains all data until the writer is completed and the pipe is exhausted. + /// + public async Task DrainAllAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var output = new ArrayBufferWriter(); + + while (true) + { + var result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + var buffer = result.Buffer; + + foreach (var segment in buffer) + output.Write(segment.Span); + + _pipe.Reader.AdvanceTo(buffer.End); + + if (result.IsCompleted) + break; + } + + return output.WrittenSpan.ToArray(); + } + + /// + /// Completes writer and reader sides of the internal pipe. + /// + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + if (!_writerCompleted) + { + _writerCompleted = true; + _pipe.Writer.Complete(); + } + _pipe.Reader.Complete(); + } + + private sealed class SlabSimulatingPool : MemoryPool + { + private readonly int _segmentSize; + private readonly Random _rng; + + public SlabSimulatingPool(int segmentSize, int seed) + { + _segmentSize = segmentSize; + _rng = new Random(seed); + } + + public override int MaxBufferSize => _segmentSize; + + public override IMemoryOwner Rent(int minBufferSize = -1) + { + var requested = minBufferSize > 0 ? minBufferSize : _segmentSize; + var size = Math.Max(requested, _segmentSize); + + var offset = _rng.Next(0, Math.Max(1, _segmentSize)); + var jitter = _rng.Next(-_segmentSize / 4, _segmentSize / 4 + 1); + var actualSize = Math.Max(16, size + jitter); + + var array = new byte[actualSize + offset]; + return new Owner(array, offset, actualSize); + } + + protected override void Dispose(bool disposing) + { + } + + private sealed class Owner(byte[] array, int offset, int length) : IMemoryOwner + { + public Memory Memory { get; } = array.AsMemory(offset, length); + + public void Dispose() + { + } + } + } +} diff --git a/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs b/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs index e0dc280..cf2a211 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestMultiSegmentProtocol.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; using System.IO.Pipelines; +using AyCode.Core.Serializers.Binaries; using AyCode.Services.SignalRs; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Protocol; @@ -25,9 +26,12 @@ namespace AyCode.Services.Server.Tests.SignalRs; internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol { private const int SegmentSize = 256; + private readonly BinaryProtocolMode _mode; - public TestMultiSegmentProtocol() + public TestMultiSegmentProtocol(BinaryProtocolMode mode = BinaryProtocolMode.Bytes) + : base(AcBinarySerializerOptions.Default, mode) { + _mode = mode; Options.BufferWriterChunkSize = SegmentSize; } @@ -38,6 +42,9 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol /// public ReadOnlyMemory GetMessageBytesMultiSegment(HubMessage message) { + if (_mode == BinaryProtocolMode.AsyncSegment) + return GetMessageBytesAsyncSegment(message); + // ── Transport-double path (production simulation) ────────────────── var transport = new SlabTransportWriter(SegmentSize); WriteMessage(message, transport); @@ -58,6 +65,18 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol return transportBytes; } + /// + /// AsyncSegment write side: uses a real PipeWriter transport so chunked protocol path activates. + /// + private ReadOnlyMemory GetMessageBytesAsyncSegment(HubMessage message) + { + using var transport = new AsyncSegmentPipeTransportWriter(SegmentSize); + WriteMessage(message, transport.Writer); + transport.CompleteWriter(); + var bytes = transport.DrainAllAsync().GetAwaiter().GetResult(); + return bytes; + } + /// /// Read side: fill Pipe with 256-byte slab segments → multi-segment ReadOnlySequence. /// Same as production Kestrel PipeReader delivering slab-sized segments. diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs index d0264e0..e4d6f11 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs @@ -19,18 +19,19 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ { private HubConnectionState _connectionState = HubConnectionState.Connected; private readonly TestableSignalRHub2 _signalRHub; - private readonly TestMultiSegmentProtocol _protocol = new(); + private readonly TestMultiSegmentProtocol _protocol; private readonly TestInvocationBinder _binder = new(); /// /// Testable SignalR client that allows testing without real HubConnection. /// - public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger) : base(logger) + public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger, BinaryProtocolMode mode = BinaryProtocolMode.Bytes) : base(logger) { MsDelay = 0; MsFirstDelay = 0; _signalRHub = signalRHub; + _protocol = new TestMultiSegmentProtocol(mode); } #region Override virtual methods for testing diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs index 02ee199..d93695c 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs @@ -22,7 +22,7 @@ namespace AyCode.Services.Server.Tests.SignalRs; public class TestableSignalRHub2 : AcWebSignalRHubBase { private IAcSignalRHubItemServer _callerClient; - private readonly TestMultiSegmentProtocol _protocol = new(); + private readonly TestMultiSegmentProtocol _protocol; private readonly TestInvocationBinder _binder = new(); #region Test Configuration @@ -49,14 +49,16 @@ public class TestableSignalRHub2 : AcWebSignalRHubBasePer-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly). - private readonly ConditionalWeakTable? _chunkStates; + /// + /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly). + /// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server. + /// + private readonly ConditionalWeakTable _chunkStates; private sealed class AsyncChunkState { @@ -69,8 +72,16 @@ public class AcBinaryHubProtocol : IHubProtocol public object?[] Args = null!; public int StreamedArgIndex; public Type StreamedArgType = null!; - public Pipe InternalPipe = null!; + public SegmentBufferReader Buffer = null!; public Task? DeserTask; + + /// + /// Total bytes of chunk frame data already consumed from the input stream + /// (including [201][UINT16] framing headers + data bytes). + /// Used to skip already-processed chunks when SignalR re-presents the buffer + /// after a false-returning TryParseMessage call. + /// + public int ChunkFrameBytesConsumed; } public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } @@ -81,9 +92,7 @@ public class AcBinaryHubProtocol : IHubProtocol _options.BufferWriterChunkSize = 4096; _protocolMode = protocolMode; _logger = logger; - _chunkStates = protocolMode == BinaryProtocolMode.AsyncSegment - ? new ConditionalWeakTable() - : null; + _chunkStates = new ConditionalWeakTable(); } /// @@ -410,10 +419,36 @@ public class AcBinaryHubProtocol : IHubProtocol { message = null; - // AsyncSegment chunk mode: non-standard framing (no INT32 length prefix) - if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState)) + // AsyncSegment chunk mode + if (_chunkStates.TryGetValue(binder, out var chunkState)) { - _logger?.LogTrace("TryParseMessage chunk mode active, inputLength={InputLength}", input.Length); + // Guard against buffer re-presentation: if SignalR re-submitted the same buffer + // (because our previous fallthrough returned false without advancing), + // the buffer may still contain: + // 1. The already-processed CHUNK_START frame + // 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously) + // Skip both to avoid duplicate writes to state.Buffer. + if (TrySkipRepresentedChunkStart(ref input)) + { + _logger?.LogInformation("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); + + // Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START) + if (chunkState.ChunkFrameBytesConsumed > 0) + { + if (input.Length < chunkState.ChunkFrameBytesConsumed) + { + _logger?.LogWarning("TryParseMessage re-presentation inconsistency: expected >= {Expected} already-consumed bytes but only {Actual} in buffer", + chunkState.ChunkFrameBytesConsumed, input.Length); + return false; + } + input = input.Slice(chunkState.ChunkFrameBytesConsumed); + _logger?.LogInformation("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}", + chunkState.ChunkFrameBytesConsumed, input.Length); + } + } + + _logger?.LogInformation("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}", + binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0); return TryParseChunkData(ref input, chunkState, binder, out message); } @@ -429,27 +464,64 @@ public class AcBinaryHubProtocol : IHubProtocol message = ParseMessage(ref reader, payloadLength, binder); - input = input.Slice(LengthPrefixSize + payloadLength); if (message != null) { + input = input.Slice(LengthPrefixSize + payloadLength); if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); return true; } - // CHUNK_START consumed but no message yet — chunk mode just activated. - // Must try chunk data immediately; returning false here would cause SignalR - // to call AdvanceTo(examined=end) and wait for new data, even though - // CHUNK_DATA/CHUNK_END may already be in the remaining buffer. - if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState)) + // CHUNK_START consumed but no complete HubMessage yet (chunk mode just activated). + // Try to process any remaining chunk data already in the buffer. + if (_chunkStates.TryGetValue(binder, out chunkState)) { - _logger?.LogDebug("TryParseMessage CHUNK_START activated, fallthrough to TryParseChunkData remainingInput={RemainingInput}", input.Length); - return TryParseChunkData(ref input, chunkState, binder, out message); + var afterChunkStart = input.Slice(LengthPrefixSize + payloadLength); + if (TryParseChunkData(ref afterChunkStart, chunkState, binder, out message)) + { + // Full chunked message processed in one call + input = afterChunkStart; + _logger?.LogInformation("TryParseMessage CHUNK_START + chunk data processed in single call"); + return true; + } + + // IMPORTANT: do NOT advance input when returning false. + // SignalR's contract is "advance only on success". If we advance here, + // the buffer state becomes inconsistent on re-submission. + // On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode + // block above handles that via TrySkipRepresentedChunkStart. + _logger?.LogInformation("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)"); + return false; } return false; } + /// + /// Detects if the buffer starts with a re-presented CHUNK_START frame pattern + /// ([INT32 length][CHUNK_START marker]). If so, advances + /// past the entire frame and returns true. + /// + /// This guards against the case where SignalR's buffer management re-presents + /// bytes we logically consumed during a previous false-returning TryParseMessage call. + /// + private static bool TrySkipRepresentedChunkStart(ref ReadOnlySequence input) + { + if (input.Length < LengthPrefixSize + 1) return false; + + Span header = stackalloc byte[LengthPrefixSize + 1]; + input.Slice(0, LengthPrefixSize + 1).CopyTo(header); + + int maybeLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32LittleEndian(header.Slice(0, LengthPrefixSize)); + byte maybeMarker = header[LengthPrefixSize]; + + if (maybeMarker != MsgAsyncChunkStart) return false; + if (maybeLen <= 0 || input.Length < LengthPrefixSize + maybeLen) return false; + + input = input.Slice(LengthPrefixSize + maybeLen); + return true; + } + private HubMessage? ParseMessage(ref SequenceReader r, int payloadLength, IInvocationBinder binder) { if (payloadLength == 0) @@ -637,51 +709,64 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); - // Write chunk data to internal pipe for background deserialization + // Write chunk data to SegmentBufferReader for background deserialization if (chunkDataSize > 0) { var dataSlice = input.Slice(3, chunkDataSize); foreach (var segment in dataSlice) - state.InternalPipe.Writer.Write(segment.Span); - SyncFlush(state.InternalPipe.Writer.FlushAsync()); + state.Buffer.Write(segment.Span); } - // Lazy start: begin background deserialization after first chunk is in the pipe. - // Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available. + // Lazy start: begin background deserialization after first chunk is written. + // SegmentBufferReaderInput.Initialize reads the already-written data immediately. if (state.DeserTask == null) { _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); - var pipeReader = state.InternalPipe.Reader; + var reader = state.Buffer; var type = state.StreamedArgType; var opts = _options; - state.DeserTask = Task.Run(() => (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); + state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts)); } input = input.Slice(totalNeeded); + state.ChunkFrameBytesConsumed += totalNeeded; continue; // try next chunk immediately } if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) { - _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — completing pipe"); + _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion"); // Signal end of data → background deser task completes - state.InternalPipe.Writer.Complete(); + state.Buffer.Complete(); object? deserializedArg = null; - if (state.DeserTask != null) + try { - deserializedArg = state.DeserTask.GetAwaiter().GetResult(); - state.InternalPipe.Reader.Complete(); + if (state.DeserTask != null) + { + deserializedArg = state.DeserTask.GetAwaiter().GetResult(); - if (_logger!.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); + } + } + catch (Exception ex) + { + _logger?.LogError(ex, "TryParseChunkData deserialization FAILED targetType={TargetType}", state.StreamedArgType.Name); + throw; + } + finally + { + _logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove"); + state.Buffer.Dispose(); + _chunkStates.Remove(binder); } // Fill the placeholder in the stored message's args FillStreamedArg(state, deserializedArg); - _chunkStates!.Remove(binder); input = input.Slice(1); // consume the single [202] byte message = state.PartialMessage; @@ -690,7 +775,16 @@ public class AcBinaryHubProtocol : IHubProtocol } // Unknown byte in chunk mode — break out (shouldn't happen) - _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking", firstByte); + _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " + + "binderHash={BinderHash} inputLength={InputLength} " + + "state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}", + firstByte, + binder.GetHashCode(), + input.Length, + state.StreamedArgType.Name, + state.DeserTask?.Status.ToString() ?? "null", + state.Buffer.WritePos, + state.Buffer.ReadPos); break; } @@ -699,7 +793,7 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), - /// creates internal Pipe, starts background deserialization task, stores state. + /// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk. /// Returns null to signal "consumed bytes, no complete message yet". /// private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) @@ -733,11 +827,13 @@ public class AcBinaryHubProtocol : IHubProtocol Args = args, StreamedArgIndex = streamedIndex, StreamedArgType = streamedType, - InternalPipe = new Pipe() + Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger) // DeserTask started lazily in TryParseChunkData after first chunk is written }; - _chunkStates!.AddOrUpdate(binder, state); + _chunkStates.AddOrUpdate(binder, state); + _logger?.LogInformation("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}", + binder.GetHashCode(), streamedType.Name); return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData }