diff --git a/.github/skills/docs-discovery/SKILL.md b/.github/skills/docs-discovery/SKILL.md index 27eff59..cf1e5bc 100644 --- a/.github/skills/docs-discovery/SKILL.md +++ b/.github/skills/docs-discovery/SKILL.md @@ -18,7 +18,7 @@ This skill READS `.md` files and updates the LLM's `[LOADED_DOCS: ...]` state. I Parse the user's most recent message (and the wider conversation tail if relevant) for concrete concepts. Examples: -- Class / type names: `AcLoggerBase`, `SegmentBufferReader`, `AcBinaryHubProtocol`, `SignalRClient` (any derived/consumer-specific type) +- Class / type names: `AcLoggerBase`, `AsyncPipeReaderInput`, `AcBinaryHubProtocol`, `SignalRClient` (any derived/consumer-specific type) - Feature areas: "logger", "log writer", "serializer", "SignalR", "hub protocol", "chunked framing", "connection builder", "options" - File hints: `Program.cs`, `AcLoggerBase.cs`, `SIGNALR.md` - Patterns / idioms: "DI factory", "appsettings", "mode negotiation" diff --git a/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj b/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj index 0f900d8..39a0534 100644 --- a/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj +++ b/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj @@ -1,21 +1,32 @@  - - - - + + + + - - - - - + + + + + - - Exe - net9.0 - enable - enable - + + Exe + net9.0 + enable + enable + + + true + true + + + + true + false + + true + diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index fa461e6..b756403 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -557,7 +557,9 @@ public static class Program new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"), // Fastest Byte[] — Runtime path (UseGeneratedCode=false). Same wire/options, no source-generated dispatch. // Always paired with the SGen variant so every layer can compare the SGen speed-up apples-to-apples. - new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"), + // COMMENTED: Reflection.Emit-based dispatch crashes under NativeAOT (PlatformNotSupportedException). + // Re-enable for JIT-mode benchmarks where SGen-vs-Runtime delta matters. + //new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"), // Default preset Byte[] — RefHandling=OnlyId (deduplicates IId-shared references on the wire) + // UseStringInterning=All (deduplicates repeated strings). Showcases the Default preset's wire-size // and CPU trade-off vs FastMode on the ~20% IId-ref / repeated-string test data. diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index d759ff7..68a103a 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -284,15 +284,6 @@ public static partial class AcBinaryDeserializer return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); } - /// - /// Deserialize from a with streaming pipeline parallelism. - /// The producer thread writes chunk data via , - /// while this method (running on a background thread) deserializes incrementally, - /// blocking on when data is exhausted. - /// - public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options) - => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options); - /// /// Deserialize from an with streaming pipeline parallelism. /// The producer thread feeds chunk data via , diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 3b59c93..d01194d 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -555,10 +555,24 @@ public static partial class AcBinarySerializer { if (value == null) { - // Null: write directly, no chunking needed - var span = pipeWriter.GetSpan(1); - span[0] = BinaryTypeCode.Null; - pipeWriter.Advance(1); + if (!multiMessage) + { + // Raw single-message mode: null is just a [BinaryTypeCode.Null] byte on the wire. + // No chunking needed, no [201]/[202] framing. + var span = pipeWriter.GetSpan(1); + span[0] = BinaryTypeCode.Null; + pipeWriter.Advance(1); + return 1; + } + + // Framed mode (multiMessage=true): null still needs to flow through AsyncPipeWriterOutput + // so the wire is [201][UINT16=1][BinaryTypeCode.Null][202] — well-formed chunked frame the + // receiver can parse. Bypassing AsyncPipeWriterOutput here would emit a bare [Null] byte that + // breaks framing for any chunked-stream consumer (e.g. AcBinaryHubProtocol AsyncSegment). + var nullOutput = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage: true, flushPolicy, flushTimeout); + nullOutput.Initialize(out var nullBuf, out var nullPos, out _); + nullBuf[nullPos++] = BinaryTypeCode.Null; + nullOutput.Flush(nullBuf, nullPos); return 1; } diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index f251dfc..3845f36 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -6,10 +6,9 @@ namespace AyCode.Core.Serializers.Binaries; /// /// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization. -/// -/// Self-contained implementation that consolidates the legacy -/// SegmentBufferReader + SegmentBufferReaderInput pair into a single sealed class -/// (see ADR-0003 at docs/adr/0003-acbinary-streaming-receive-architecture.md). +/// Self-contained implementation — universal receive-side primitive +/// for the AcBinary streaming framework (see ADR-0003 at +/// docs/adr/0003-acbinary-streaming-receive-architecture.md). /// /// The naming mirrors the send-side AsyncPipeWriterOutput primitive — both follow the /// .NET BCL convention for type-level Async prefix (AsyncEnumerable, diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs index 2b1417e..3a4296f 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs @@ -29,9 +29,9 @@ namespace AyCode.Core.Serializers.Binaries; /// Why not relax the deserializer's struct constraint? 16+ generic declarations /// rely on it (DeserializeSequence, TypeReaderTable, /// DeserializationContextPool chain). Relaxing would require a wide refactor with risk -/// to the existing perf-critical struct paths (SegmentBufferReaderInput, -/// ArrayBinaryInput, SequenceBinaryInput). The ~2 ns per call savings is -/// sub-picosecond per byte at typical chunk sizes — not worth the blast radius. +/// to the existing perf-critical struct paths (ArrayBinaryInput, +/// SequenceBinaryInput). The ~2 ns per call savings is sub-picosecond per byte at +/// typical chunk sizes — not worth the blast radius. /// internal readonly struct AsyncPipeReaderInputAdapter : IBinaryInputBase { diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index e47d584..302e883 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -40,11 +40,13 @@ namespace AyCode.Core.Serializers.Binaries; /// Grow waits only if previous flush hasn't completed. Peak memory: ~chunk_size × 2. /// Pro: max producer/flush parallelism with bounded memory. /// Con: slow consumer blocks producer at next Grow (bounded by flushTimeout). -/// : Grow() never awaits; Pipe coalesces flushes up to -/// PauseWriterThreshold (~64 KB). Peak memory: up to PauseWriterThreshold. -/// Pro: highest throughput on bounded payloads. -/// Con: peak memory unbounded by chunk_size; under heavy backpressure may fall back to -/// an owned buffer, losing zero-copy for that chunk. +/// : chunks accumulate while a flush is in-flight; on +/// the per-window safety threshold (~64 KB), the producer waits for the prior flush, then fires +/// one batched flush covering the whole window and resets the window-counter. Peak memory: +/// ~64 KB per window. Pro: dramatically fewer FlushAsync syscalls (one per ~64 KB window +/// instead of one per chunk) → major throughput win on transports with non-trivial flush overhead. +/// Con: per-window peak ~64 KB; under heavy backpressure may fall back to an owned buffer, +/// losing zero-copy for that chunk. /// /// /// Flush strategy auto-selects on writer type — Stream-backed PipeWriters @@ -119,7 +121,15 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private readonly FlushPolicy _flushPolicy; private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; + // Cumulative total of data bytes committed since Initialize. Used by GetTotalPosition for the + // Position property and the Serialize* methods' return value. Never resets mid-Serialize. private int _committedBytes; + // Window-local counter of data bytes committed since the last FlushAsync was fired (Coalesced + // mode safety-net). Resets when a new FlushAsync starts, so each flush window starts from 0 + // and the safety-net trip-point (~64 KB) bounds peak in-flight buffer per window — not the + // total payload size. Without this, Coalesced would degrade to per-chunk-sync behavior after + // the first ~15 chunks of any multi-MB payload. + private int _unflushedBytes; private int _currentChunkStart; // Whether the current chunk's buffer is the owned ArrayPool fallback (true) or a zero-copy // PipeWriter slab (false). Used by CommitCurrentChunk to pick the commit strategy. @@ -180,6 +190,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter); _committedBytes = 0; + _unflushedBytes = 0; _hasOwnedBuffer = false; _ownedBuffer = null; _lastFlush = default; @@ -215,6 +226,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; + _unflushedBytes = 0; _lastFlush = default; AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); @@ -247,16 +259,28 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase // the next chunk into the PipeWriter's buffer concurrently with the background FlushAsync. // FlushPolicy.DoubleBuffered: wait at next Grow if the previous parallel flush hasn't // completed yet → max two chunks in flight (~chunk_size × 2 peak memory). - // FlushPolicy.Coalesced: skip the per-chunk wait; only block when _committedBytes - // approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer - // growth under a slow consumer. + // FlushPolicy.Coalesced: skip the per-chunk wait — bytes accumulate in the PipeWriter's + // buffer until _unflushedBytes approaches the Pipe's PauseWriterThreshold (~64 KB). + // At that point we wait for the prior flush, then fire ONE flush that batches the + // entire window. After firing, _unflushedBytes resets so the next window can begin. + // This produces ~64 KB-sized flush windows instead of per-chunk flushes — fewer + // syscalls, better network throughput. // The conditional FlushAsync at the end avoids double-flush if the previous flush - // is still in progress (Coalesced skip path). - if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); + // is still in progress (Coalesced skip path — keep accumulating). + if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _unflushedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); - if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync(); + if (_lastFlush.IsCompleted) + { + // The accumulated unflushed bytes are about to be moved into this flush — reset the + // window-counter so the next flush window starts fresh. Without this reset, the + // safety-net would trip permanently after the first ~15 chunks of a multi-MB payload, + // degrading Coalesced to per-chunk-sync behavior. _committedBytes (cumulative total + // for GetTotalPosition) is intentionally unchanged — only the window-counter resets. + _unflushedBytes = 0; + _lastFlush = _pipeWriter.FlushAsync(); + } } // Acquire new chunk with header reservation (common to both paths). @@ -378,7 +402,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase else _pipeWriter.Advance(dataBytes); } - _committedBytes += dataBytes; // only count data bytes, not framing + // Both counters track only data bytes (not framing). _committedBytes is the cumulative + // total used by GetTotalPosition; _unflushedBytes is the per-window counter used by the + // Coalesced safety-net (resets each time a new FlushAsync starts). + _committedBytes += dataBytes; + _unflushedBytes += dataBytes; } /// diff --git a/AyCode.Core/Serializers/Binaries/FlushPolicy.cs b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs index cbf665f..ac11821 100644 --- a/AyCode.Core/Serializers/Binaries/FlushPolicy.cs +++ b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs @@ -44,19 +44,24 @@ public enum FlushPolicy DoubleBuffered, /// - /// Coalesced — producer never awaits per-chunk flushes. The underlying Pipe coalesces - /// flushes adaptively up to its PauseWriterThreshold (~64 KB committed bytes by default). - /// Producer continues serializing in parallel with the consumer's drain; if the consumer - /// catches up earlier, the pipe flushes sooner and the producer keeps going without waiting. - /// Peak memory: grows up to PauseWriterThreshold (~64 KB) under slow - /// consumer; close to chunk_size × 2 under fast consumer. - /// Pro: highest throughput on bounded payloads; never blocks for fast consumers; - /// pipe-managed adaptive backpressure kicks in only when actually needed. - /// Con: peak memory unbounded by chunk_size — grows to PauseWriterThreshold under - /// slow-consumer conditions; under heavy backpressure may fall back to an owned buffer + /// Coalesced — producer batches up to ~64 KB worth of chunks into a single flush window. + /// While a previous FlushAsync is in-flight, new chunks accumulate in the + /// 's buffer without firing additional flushes. + /// When the per-window counter approaches the safety threshold (~64 KB), the producer waits + /// for the in-flight flush to complete, then fires one batched flush covering the + /// entire window. After firing, the window-counter resets and the next window begins. + /// Peak memory: bounded per-window by ~64 KB (the in-flight chunks during one + /// flush window). Total peak in process ≤ chunk_size × 2 + ~64 KB pipe-internal accumulation. + /// Pro: dramatically fewer FlushAsync syscalls compared to per-chunk modes — + /// e.g. a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per ~64 KB window) instead of + /// ~2 300 flushes (one per chunk). Major throughput win on transports where each FlushAsync + /// has non-trivial overhead (network sockets, Kestrel WebSocket, kernel TCP buffers). + /// Con: per-window peak memory ~64 KB (vs chunk_size × 2 in + /// ); under heavy backpressure may fall back to an owned buffer /// (losing zero-copy for that chunk). /// Recommended when payload size is known and bounded (REST request/response, fixed-size - /// IPC message), and the consumer is reliably fast. + /// IPC message), large enough to span multiple flush windows, and the consumer is reliably + /// fast enough to drain the pipe between windows. /// Coalesced } diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs deleted file mode 100644 index 3963d76..0000000 --- a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs +++ /dev/null @@ -1,228 +0,0 @@ -using System; -using System.Buffers; -using System.Diagnostics; -using System.Threading; -using Microsoft.Extensions.Logging; - -namespace AyCode.Core.Serializers.Binaries; - -/// -/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization. -/// -/// Replaces for the AsyncSegment read path: -/// the main thread writes incoming chunk data via , while a background -/// deserialization thread reads through which blocks -/// on when data is exhausted. -/// -/// Backed by a single contiguous byte[] from . -/// Positions reset to 0 when the consumer catches up (zero-cost reuse). -/// Grow is the absolute last resort (single Rent, practically never happens). -/// -/// Thread-safety: -/// -/// _writePos: written by producer (Volatile.Write), read by consumer (Volatile.Read). -/// _readPos: written by consumer (Volatile.Write), read by producer (Volatile.Read). -/// Reset-to-0 happens in only when _readPos == _writePos -/// (consumer is blocked in TryAdvanceSegment, not reading the buffer). -/// Grow happens in only when reset is insufficient -/// (consumer is behind). Old buffer kept for consumer's local reference; -/// picks up new buffer. -/// -/// -public sealed class SegmentBufferReader : IDisposable -{ - private byte[] _buffer; - private int _writePos; - private int _readPos; // consumer reports consumed position here - private bool _completed; - - private readonly ManualResetEventSlim _dataAvailable; - private readonly ILogger? _logger; - - // After grow: ALL old buffers are kept alive until Dispose. - // Cannot return them to the pool mid-operation because the consumer thread - // may hold a local reference to any of them (its local 'buffer' variable is - // only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind). - private byte[][]? _oldBuffers; - private int _oldBufferCount; - - /// - /// Creates a new SegmentBufferReader with the specified initial capacity. - /// Recommended: options.BufferWriterChunkSize * 2 (e.g. 8 KB for the SignalR-context 4 KB chunk size, - /// 128 KB for the standalone 64 KB default). See class remarks - /// for the full sizing rationale (two-chunks-worth headroom + reset-to-0 cycling). - /// - /// Initial buffer size. Rounded up by ArrayPool. - /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds. - public SegmentBufferReader(int initialCapacity, ILogger? logger = null) - { - if (initialCapacity <= 0) - throw new ArgumentOutOfRangeException(nameof(initialCapacity)); - - _buffer = ArrayPool.Shared.Rent(initialCapacity); - _dataAvailable = new ManualResetEventSlim(false); - _logger = logger; - } - - // --- Producer API (main thread) --- - - /// - /// Appends chunk data to the buffer. - /// Resets positions to 0 when consumer has caught up (zero-cost). - /// Grows as last resort if consumer is behind and buffer is full. - /// Signals the consumer thread that new data is available. - /// - public void Write(ReadOnlySpan data) - { - if (data.IsEmpty) return; - - // If consumer consumed everything → reset positions to 0 (zero-cost reuse) - var rp = Volatile.Read(ref _readPos); - if (rp > 0 && rp == _writePos) - { - DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos); - _writePos = 0; - Volatile.Write(ref _readPos, 0); - } - - // Grow if buffer can't fit the new data (rare — consumer typically keeps pace) - if (_writePos + data.Length > _buffer.Length) - { - DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}", - _writePos, data.Length, _buffer.Length); - Grow(_writePos + data.Length); - } - - data.CopyTo(_buffer.AsSpan(_writePos)); - var newWritePos = _writePos + data.Length; - Volatile.Write(ref _writePos, newWritePos); - _dataAvailable.Set(); - - DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}", - data.Length, newWritePos, Volatile.Read(ref _readPos)); - } - - /// - /// Signals that no more data will be written (CHUNK_END received). - /// The consumer's will return false - /// once all buffered data is consumed. - /// - public void Complete() - { - Volatile.Write(ref _completed, true); - _dataAvailable.Set(); - - DebugLog("Complete writePos={Wp} readPos={Rp}", - Volatile.Read(ref _writePos), Volatile.Read(ref _readPos)); - } - - // --- Consumer API (deser thread, used by SegmentBufferReaderInput) --- - - /// Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment. - internal byte[] Buffer => _buffer; - - /// Current write position. All bytes in [ReadPos..WritePos) are valid. - internal int WritePos => Volatile.Read(ref _writePos); - - /// Consumer's last reported read position. - internal int ReadPos => Volatile.Read(ref _readPos); - - /// True after is called. - internal bool IsCompleted => Volatile.Read(ref _completed); - - /// - /// Called by consumer to report how far it has read. - /// Enables the producer to reset positions to 0 when everything is consumed. - /// - internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position); - - /// Blocks until new data is written or is called. - internal void WaitForData() => _dataAvailable.Wait(); - - /// - /// Resets the signal for the double-check pattern: - /// ResetSignal() → check condition → if false, WaitForData(). - /// - internal void ResetSignal() => _dataAvailable.Reset(); - - // --- Lifecycle --- - - public void Dispose() - { - // Return all old buffers accumulated from grows - if (_oldBuffers != null) - { - for (var i = 0; i < _oldBufferCount; i++) - { - ArrayPool.Shared.Return(_oldBuffers[i]); - _oldBuffers[i] = null!; - } - _oldBuffers = null; - _oldBufferCount = 0; - } - - // Return current buffer - if (_buffer != null!) - { - ArrayPool.Shared.Return(_buffer); - _buffer = null!; - } - - _dataAvailable.Dispose(); - } - - // --- Internal --- - - private void Grow(int requiredCapacity) - { - var newSize = Math.Max(_buffer.Length * 2, requiredCapacity); - var newBuffer = ArrayPool.Shared.Rent(newSize); - System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos); - - // Keep the current buffer alive — consumer's local 'buffer' variable may still reference it - // (consumer may lag multiple grows behind before calling TryAdvanceSegment). - // Returning old buffers to the pool mid-operation would cause use-after-free - // if another pool user overwrites them while the consumer is still reading. - if (_oldBuffers == null) - _oldBuffers = new byte[4][]; - else if (_oldBufferCount == _oldBuffers.Length) - Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2); - - _oldBuffers[_oldBufferCount++] = _buffer; - _buffer = newBuffer; - } - - // --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) --- - - /// - /// Emits a debug log if the logger is attached and Debug level is enabled. - /// Compiled out entirely in RELEASE builds via . - /// - [Conditional("DEBUG")] - internal void DebugLog(string message) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + message); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0, object? arg1) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0, object? arg1, object? arg2) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2); - } -} diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs deleted file mode 100644 index 1763ed8..0000000 --- a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs +++ /dev/null @@ -1,130 +0,0 @@ -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Threading; - -namespace AyCode.Core.Serializers.Binaries; - -/// -/// Binary input that reads from a for chunked streaming deserialization. -/// -/// Replaces PipeReaderBinaryInput: instead of blocking on PipeReader.ReadAsync(), -/// blocks on when data is exhausted. Much simpler because -/// the buffer is a single contiguous byte[] — no multi-segment iteration, no cross-boundary -/// scratch buffers. -/// -/// The deserialization context's hot path reads directly from the buffer array using local -/// buffer/position/bufferLength variables. -/// is only called when position >= bufferLength (cold path), at which point it reports -/// the consumed position via , then either -/// provides more data or blocks until data arrives. -/// -/// Position reset: when the producer detects readPos == writePos (all consumed), -/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions. -/// -/// -/// Recommended initialCapacity: -/// options.BufferWriterChunkSize * 2 (typically 8 KB for the SignalR-context 4 KB chunk size, -/// 128 KB for the standalone 64 KB default). Sized to fit two chunks worth of in-flight bytes — -/// enough headroom for the producer to write the next chunk while the consumer is reading the -/// previous, with reset-to-0 cycling reusing the same buffer for the message's lifetime regardless -/// of total payload size. Larger values waste memory; smaller values trigger occasional grows -/// under burst-write conditions. -/// -/// -public struct SegmentBufferReaderInput : IBinaryInputBase -{ - private readonly SegmentBufferReader _reader; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public SegmentBufferReaderInput(SegmentBufferReader reader) - { - _reader = reader; - } - - /// - /// Provides the initial buffer state. Called once before deserialization begins. - /// Task.Run starts after the first Write() (lazy start in TryParseChunkData), - /// so data is already available — no wait needed. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Initialize(out byte[] buffer, out int position, out int bufferLength) - { - buffer = _reader.Buffer; - position = 0; - bufferLength = _reader.WritePos; - DebugLog("Input.Initialize bufferLength=" + bufferLength); - } - - /// - /// Called when the deserialization context needs more bytes than currently available. - /// Reports consumed position to the producer, then blocks via - /// until enough data arrives or completion is signaled. - /// - /// Uses the double-check pattern to avoid missed signals: - /// Reset() → check → if still not enough, Wait(). - /// - /// No cross-boundary handling needed — the buffer is a single contiguous byte[]. - /// After grow, re-reads _reader.Buffer to get the new (larger) array. - /// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions. - /// - [MethodImpl(MethodImplOptions.NoInlining)] - public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) - { - DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed); - - // Report how far we've consumed — enables producer to reset positions to 0 - _reader.SetReadPos(position); - - while (true) - { - // Re-read positions (may have been reset to 0 by producer) - int rp = _reader.ReadPos; - int wp = _reader.WritePos; - - if (wp - rp >= needed) - { - buffer = _reader.Buffer; // may be new array after grow - position = rp; // may be 0 after reset - bufferLength = wp; - DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength); - return true; - } - - if (_reader.IsCompleted) - { - // No more data will arrive. Return whatever is left. - if (wp > rp) - { - buffer = _reader.Buffer; - position = rp; - bufferLength = wp; - DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength); - return true; - } - DebugLog("Input.TryAdvanceSegment return false (completed, empty)"); - return false; // end of input - } - - // Double-check pattern: Reset → verify → Wait - _reader.ResetSignal(); - - rp = _reader.ReadPos; - wp = _reader.WritePos; - if (wp - rp >= needed || _reader.IsCompleted) - continue; // re-check from top - - DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")"); - _reader.WaitForData(); // ManualResetEventSlim.Wait() - DebugLog("Input.TryAdvanceSegment woke up"); - } - } - - [Conditional("DEBUG")] - private void DebugLog(string message) => _reader.DebugLog(message); - - /// - /// No-op. Buffer lifecycle is managed by . - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Release() { } -} diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md index dadaaeb..57c083d 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md @@ -203,7 +203,7 @@ Once registered, controllers using `[FromBody] T model` and `IActionResult` auto - **InputFormatter shape**: `AcBinaryInputFormatter : InputFormatter` with `SupportedMediaTypes.Add("application/x-acbinary")`. The `ReadRequestBodyAsync` reads from `context.HttpContext.Request.BodyReader` (PipeReader) — either drains to byte[] for simple cases (size-bounded by `[Request].MaxAllowedSize`), or uses the `AsyncPipeReaderInput` + drain-task pattern for low-memory streaming on huge payloads. Decide which is the default; expose the streaming variant as opt-in. -- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning for higher throughput at the cost of owned-buffer fallback risk. +- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning batches flushes into ~64 KB windows for higher throughput, at the cost of owned-buffer fallback risk under heavy backpressure. - **Wire-format choice**: raw chunked stream (`SerializeChunked`) is the natural fit for HTTP single-request-single-response. The multi-message framed variant (`SerializeChunkedFramed`) is over-engineered for REST — there's no concept of "next message on this stream" within a single HTTP request. Document this choice clearly. diff --git a/AyCode.Core/docs/BINARY/BINARY_TODO.md b/AyCode.Core/docs/BINARY/BINARY_TODO.md index f98d630..cbc5f65 100644 --- a/AyCode.Core/docs/BINARY/BINARY_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_TODO.md @@ -121,7 +121,7 @@ The property name `BufferWriterChunkSize` is misleading: across the three output | `ArrayBinaryOutput` (Byte[] API) | Initial buffer capacity of the internal `byte[]` | No | | `BufferWriterBinaryOutput` (IBufferWriter overload) | Internal buffer size — how much data accumulates before `Advance()` + new `GetMemory()` on the underlying writer | No | | `AsyncPipeWriterOutput` (streaming) | Both internal buffer **and** wire-format chunk frame size for chunked framing | **Yes** (only here) | -| Receive side (`AsyncPipeReaderInput`, `SegmentBufferReader[Input]`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) | +| Receive side (`AsyncPipeReaderInput`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) | Only the streaming `AsyncPipeWriterOutput` path has a wire-format "chunk" concept (chunked framing for length-prefixed segments). On the other 75% of paths the property name reads as if the serializer were segmenting the payload, which is not what happens. @@ -137,7 +137,7 @@ Pick one before touching code. Option 2 is the most correct but adds API surface - `AcBinarySerializerOptions.cs` (definition) - `AcBinarySerializer.cs` × 3 sites (`ArrayBinaryOutput` ctor, `BufferWriterBinaryOutput` ctor, `AsyncPipeWriterOutput` ctor) - `AcBinaryDeserializer.cs` × 1 site (receive-side initial capacity derivation) -- `AsyncPipeReaderInput.cs`, `SegmentBufferReader.cs`, `SegmentBufferReaderInput.cs` — XML doc cross-refs +- `AsyncPipeReaderInput.cs` — XML doc cross-refs - `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters) - Consumer-side: `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` ctor mutates `_options.BufferWriterChunkSize = options.BufferSize;` — see `BINARY_ISSUES.md#accore-bin-i-...` (struct-mutation context). Coordinate the rename with the struct-mutation fix to avoid two cross-cutting churn waves on the same property. @@ -263,6 +263,103 @@ This is honest — does not overclaim universal speed, does not hide the small ` - API doc-string contains a "When to use which mode?" decision matrix; explicitly compares with MemoryPack's pseudo-streaming. - `leaveOpen` parameter behaves per the System.Text.Json / MessagePack convention across all three modes. +## ACCORE-BIN-T-D7K4: Add `DeserializeAsync(Stream, T)` async overloads with mode-driven input strategy +**Priority:** P1 · **Type:** Feature · **Related:** [`ACCORE-BIN-T-T8K3`](#accore-bin-t-t8k3-add-serializeasyncstream-t-async-overloads-with-mode-driven-output-strategy) (companion write-side overload), `ACCORE-BIN-T-N9G6` (non-generic Type-based dispatch) + +Companion to `T8K3` on the receive side. The mainstream serializer ecosystem (System.Text.Json, MessagePack, Newtonsoft.Json, MemoryPack) all expose `DeserializeAsync(Stream)` — the symmetric counterpart of `SerializeAsync(Stream, T)`. **AcBinary's public API surface MUST include this overload** for parity; consumers expect a `Stream` parameter for receive paths (file load, HTTP response body, network stream) and don't navigate `PipeReader.Create(stream)` workarounds. Market-entry-blocking otherwise. + +### Implementation: zero new `IBinaryInputBase` impl needed + +The existing receive-side primitives cover the full strategy space via BCL `PipeReader.Create(stream)`: + +| Mode | Input strategy | Peak memory | Pipeline parallelism | Use when | +|---|---|---|---|---| +| **`Bytes`** (default) | `await stream.CopyToAsync(MemoryStream)` → `Deserialize(byte[])` (existing overload) | Full payload as `byte[]` (pooled) | No | Typical payloads (<10 MB), throughput-focus | +| **`Segment`** | `await PipeReader.Create(stream).ReadAsync()` → `Deserialize(ReadOnlySequence)` (existing overload) | PipeReader pause-threshold-bounded (~64 KB) | No | Mid-size payloads, no full byte[] alloc desired | +| **`AsyncSegment`** | `AsyncPipeReaderInput` + `DrainFromAsync(PipeReader.Create(stream))` + `Deserialize(input)` (existing overload) | Chunk-size-bounded (~8 KB) | Yes (producer drain Task in parallel with deser Task) | Very large payloads (>10 MB), memory-tight hosts | + +The `AcBinaryOutputMode` enum (introduced by `T8K3`) is symmetric — it controls deser-input strategy as well. The same enum value picks the matching read path. **No new `IBinaryInputBase` implementation needed** — the trio of existing inputs (`ArrayBinaryInput`, `SequenceBinaryInput`, `AsyncPipeReaderInput`) already cover all three modes; the new overload is a thin shim that wraps the `Stream` and routes to the right existing overload. + +### Public API shape + +```csharp +public static ValueTask DeserializeAsync( + Stream stream, + AcBinarySerializerOptions? options = null, + bool leaveOpen = false, + CancellationToken ct = default); + +// Non-generic Type-based variant (coordinated with N9G6): +public static ValueTask DeserializeAsync( + Stream stream, + Type targetType, + AcBinarySerializerOptions? options = null, + bool leaveOpen = false, + CancellationToken ct = default); +``` + +### Implementation outline (per mode) + +```csharp +// Bytes mode (default — simplest path, sub-LOH-friendly fast path): +public static async ValueTask DeserializeAsync_Bytes(Stream stream, ..., CancellationToken ct) +{ + var rented = ArrayPool.Shared.Rent((int)Math.Min(stream.CanSeek ? stream.Length : 4096, int.MaxValue)); + try + { + var totalRead = 0; + int read; + while ((read = await stream.ReadAsync(rented.AsMemory(totalRead), ct)) > 0) + { + totalRead += read; + if (totalRead == rented.Length) { /* grow rented */ } + } + return Deserialize(rented, 0, totalRead, options); + } + finally { ArrayPool.Shared.Return(rented); } +} + +// Segment mode (PipeReader.Create wrapping, then drain to ReadOnlySequence): +public static async ValueTask DeserializeAsync_Segment(Stream stream, ..., CancellationToken ct) +{ + var pipeReader = PipeReader.Create(stream, new(leaveOpen: leaveOpen)); + var result = await pipeReader.ReadAtLeastAsync(int.MaxValue, ct); // drain whole stream + var seq = result.Buffer; + var obj = Deserialize(seq, options); + pipeReader.AdvanceTo(seq.End); + await pipeReader.CompleteAsync(); + return obj; +} + +// AsyncSegment mode (chunked streaming pipeline, parallel drain + deser): +public static async ValueTask DeserializeAsync_AsyncSegment(Stream stream, ..., CancellationToken ct) +{ + using var input = new AsyncPipeReaderInput(options.BufferWriterChunkSize * 2, multiMessage: false); + var pipeReader = PipeReader.Create(stream, new(leaveOpen: leaveOpen)); + var deserTask = Task.Run(() => Deserialize(input, options), ct); + await input.DrainFromAsync(pipeReader, ct); + await pipeReader.CompleteAsync(); + return await deserTask; +} +``` + +### Honest performance positioning + +Symmetric to T8K3's analysis: + +- **`Bytes` mode**: simplest, single contiguous `byte[]` (pooled) → `Deserialize(byte[])`. Comparable to MemoryPack's `DeserializeAsync` (which does similar full-buffer-then-deser). **Best for typical workloads.** +- **`Segment` mode**: zero-copy from PipeReader's natural `ReadOnlySequence` — no extra byte[] allocation. **Best for mid-size payloads where allocation matters but pipeline overlap doesn't.** +- **`AsyncSegment` mode**: producer-drain Task and consumer-deser Task in parallel via `AsyncPipeReaderInput`. Wall-clock = max(network-drain, deser-CPU) + small overlap-cost. **Best for large payloads + slow transports** (network, mobile, satellite — where transit dominates and overlap pays). + +### Acceptance + +- `DeserializeAsync` round-trips against `SerializeAsync(Stream, T)` (`T8K3`) via `MemoryStream` in all three modes. +- Cancellation propagates correctly (`OperationCanceledException` on cancelled token mid-stream); partial-buffer state cleaned up; pooled byte[] returned even on cancellation. +- **Throughput matrix benchmark** (mirror of T8K3): 4 transports (`MemoryStream`, `FileStream`, `NamedPipeStream`, `NetworkStream`) × 3 modes × 3 payload sizes. Results documented in `Test_Benchmark_Results/Benchmark/DeserializeAsync_Stream_Modes.LLM`. +- **Memory-bounded benchmark**: 100 MB payload from `FileStream` in `AsyncSegment` mode → peak managed-heap delta ≤ 1 MB throughout. Same payload in `Bytes` mode → peak ~100 MB (expected, documented). +- API doc-string contains a "When to use which mode?" decision matrix; cross-references T8K3's symmetric write-side guidance. +- `leaveOpen` parameter behaves per the System.Text.Json / MessagePack convention across all three modes. + ## ACCORE-BIN-T-N9G6: Add non-generic `Type`-based `Serialize(object, Type, ...)` overloads **Priority:** P2 · **Type:** Feature · **Related:** `ACCORE-BIN-T-T8K3` diff --git a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md index 5c92750..1f45ae7 100644 --- a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md @@ -117,7 +117,7 @@ Constructor parameter `flushPolicy` of type `FlushPolicy` (default `FlushPolicy. - **`FlushPolicy.PerChunk`**: `Grow()` commits → flushes → awaits → acquires next chunk. Strictly bounded peak memory (~chunk_size × 1). No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. Auto-applied on Stream-backed PipeWriter regardless of policy. Recommended for memory-sensitive scenarios where payload size is unpredictable. - **`FlushPolicy.DoubleBuffered`** (default): `Grow()` is fire-and-forget for the previous flush; only blocks at the NEXT chunk's `Grow` if the previous flush hasn't completed. Peak memory ~chunk_size × 2 (current + previous overlapping). Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. The recommended balanced default for typical streaming. -- **`FlushPolicy.Coalesced`**: `Grow()` does not wait per-chunk. The underlying Pipe coalesces flushes adaptively up to its `PauseWriterThreshold` (~64 KB committed bytes). Highest serialization throughput on bounded payloads; memory grows to `PauseWriterThreshold` under slow-consumer conditions. +- **`FlushPolicy.Coalesced`**: `Grow()` does not wait per-chunk. While a previous `FlushAsync` is in-flight, new chunks accumulate in the `PipeWriter` buffer (a per-window counter `_unflushedBytes` tracks the accumulation). When the window approaches the safety threshold (~64 KB), the producer waits for the in-flight flush, then fires **one** batched `FlushAsync` covering the entire window — and the window-counter resets. This produces ~64 KB-sized flush windows instead of per-chunk flushes (e.g. a 9.5 MB payload at 4 KB chunks fires ~150 batched flushes, not ~2 300 per-chunk flushes). Major throughput win on transports where each `FlushAsync` has non-trivial overhead (network sockets, Kestrel WebSocket, kernel TCP buffers). Per-window peak memory ~64 KB (vs `chunk_size × 2` in `DoubleBuffered`); under heavy backpressure may fall back to an owned buffer, losing zero-copy for that chunk. In all three modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. diff --git a/AyCode.Core/docs/BINARY/README.md b/AyCode.Core/docs/BINARY/README.md index 688d186..541a9b3 100644 --- a/AyCode.Core/docs/BINARY/README.md +++ b/AyCode.Core/docs/BINARY/README.md @@ -27,4 +27,4 @@ Start with [`BINARY_FEATURES.md`](BINARY_FEATURES.md) (overview), then [`BINARY_ ## Related ADRs -- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — Receive-side streaming architecture (Status: Proposed 2026-04-27). Consolidates `SegmentBufferReader` + `SegmentBufferReaderInput` into a single `AsyncPipeReaderInput` primitive (mirrors send-side `AsyncPipeWriterOutput`); adds transport-agnostic helpers (NamedPipe + FileStream). Implementation: [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) Steps 1–5 + `SIGNALR_BINARY_PROTOCOL_TODO.md` Step 6. +- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — Receive-side streaming architecture (Status: Accepted 2026-05-03, partially executed). Delivered: `SegmentBufferReader` + `SegmentBufferReaderInput` consolidated into a single `AsyncPipeReaderInput` primitive (mirrors send-side `AsyncPipeWriterOutput`); SignalR receive-side migration completed. Dropped during execution: NamedPipe + FileStream helpers (Steps 4 & 5) — framework stays consumer-implements-transport, exposes only generic `PipeWriter` / `PipeReader` primitives. diff --git a/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs b/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs index 9662808..b0a846a 100644 --- a/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs +++ b/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs @@ -24,12 +24,9 @@ public static class AcSignalRServerProtocolExtensions /// The optional callback — overrides DI values inline /// /// - public static ISignalRServerBuilder AddAcBinaryProtocol( - this ISignalRServerBuilder builder, - Action? configure = null) + public static ISignalRServerBuilder AddAcBinaryProtocol(this ISignalRServerBuilder builder, Action? configure = null) { - builder.Services.AddSingleton(sp => - AcSignalRProtocolExtensions.BuildProtocol(sp, configure)); + builder.Services.AddSingleton(sp => AcSignalRProtocolExtensions.BuildProtocol(sp, configure)); return builder; } } diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index 99e36ed..2d732e7 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -52,7 +52,9 @@ public abstract class AcWebSignalRHubBase(IConfiguration public override async Task OnConnectedAsync() { InitDiagnosticLoggerIfNeeded(); + Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}"); + LogContextUserNameAndId(); await base.OnConnectedAsync(); } diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 5753236..c6952d0 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -64,9 +64,11 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Send path: AsyncSegment is unsupported (sync-over-async flush blocks the single UI thread). /// Receive path: when chunked wire arrives, background Task.Run is skipped; - /// the deserializer runs synchronously on CHUNK_END over the already-buffered data - /// ('s ManualResetEventSlim.Wait() would throw - /// ). + /// the deserializer runs synchronously on CHUNK_END over the already-buffered + /// . After Complete(), the input's + /// TryAdvanceSegment never blocks on ManualResetEventSlim.Wait() (which would + /// throw on WASM) — it returns buffered data + /// immediately and signals end-of-stream when exhausted. /// /// /// @@ -99,7 +101,7 @@ public class AcBinaryHubProtocol : IHubProtocol public object?[] Args = null!; public int StreamedArgIndex; public Type StreamedArgType = null!; - public SegmentBufferReader Buffer = null!; + public AsyncPipeReaderInput Input = null!; public Task? DeserTask; /// @@ -429,9 +431,22 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Writes a message using chunked protocol framing for AsyncSegment mode. - /// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg. - /// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy). - /// CHUNK_END: [202] (1 byte, no data — all data already committed by output). + /// The two phases: + /// + /// CHUNK_START envelope — standard SignalR framed message + /// ([INT32 length][200 marker][header][args except streamedArg as INT32 -1]), + /// written here via . + /// CHUNK_DATA + CHUNK_END — fully owned by + /// (invoked through ): + /// emits [201][UINT16 size][data] per chunk + [202] end marker + final + /// FlushAsync. This protocol layer no longer writes [201]/[202] + /// bytes or calls FlushAsync after the streamed-arg serialize — those are the + /// streaming primitive's responsibility (see BINARY_ASYNCPIPE docs). + /// + /// For streamedArg == null, still + /// drives in framed mode — wire is + /// [201][UINT16=1][Null][202], deserializing back to null. No special-casing + /// needed in this layer. /// private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter) { @@ -506,24 +521,20 @@ public class AcBinaryHubProtocol : IHubProtocol SyncFlush(pipeWriter.FlushAsync()); - // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- - if (streamedArg != null) - { - dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout); - _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); - } - - // --- CHUNK_END [202] --- - var endByte = pipeWriter.GetSpan(1); - endByte[0] = MsgAsyncChunkEnd; - pipeWriter.Advance(1); - - SyncFlush(pipeWriter.FlushAsync()); - - _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); + // --- CHUNK_DATA + CHUNK_END (fully delegated to AsyncPipeWriterOutput) --- + // AsyncPipeWriterOutput in framed mode owns the entire chunked-stream emission: + // - [201][UINT16 size][data] per chunk + // - [202] CHUNK_END marker + // - final FlushAsync + // This includes the null streamedArg case (since the AcBinarySerializer null-bypass for + // multiMessage=true was removed) — wire is [201][UINT16=1][Null][202], deserialized back to null. + // No manual [202] write or extra FlushAsync needed in this layer. + dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout); + _logger?.LogDebug("WriteMessageChunked CHUNK_DATA + CHUNK_END emitted via AsyncPipeWriterOutput dataBytes={DataBytes}", dataBytes); // Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1) - // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data + // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data. + // The +1 at the end is the [202] CHUNK_END marker (now written by AsyncPipeWriterOutput.Flush()). var chunkSize = _options.BufferWriterChunkSize; var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0; var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1; @@ -566,7 +577,7 @@ public class AcBinaryHubProtocol : IHubProtocol // the buffer may still contain: // 1. The already-processed CHUNK_START frame // 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously) - // Skip both to avoid duplicate writes to state.Buffer. + // Skip both to avoid duplicate writes to state.Input. if (TrySkipRepresentedChunkStart(ref input)) { _logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); @@ -864,29 +875,31 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); - // Write chunk data to SegmentBufferReader for background deserialization + // Feed chunk data into AsyncPipeReaderInput for background deserialization. + // Note: the input is multiMessage:false — we strip framing here and pass raw data. if (chunkDataSize > 0) { var dataSlice = input.Slice(3, chunkDataSize); foreach (var segment in dataSlice) - state.Buffer.Write(segment.Span); + state.Input.Feed(segment.Span); } // Lazy start: begin background deserialization after first chunk is written. - // SegmentBufferReaderInput.Initialize reads the already-written data immediately. - // Browser fallback: skip Task.Run — SegmentBufferReader.WaitForData relies on - // ManualResetEventSlim.Wait which throws PlatformNotSupportedException on WASM. - // Instead, buffer all chunks and run the deserializer synchronously on CHUNK_END, - // where state.Buffer.Complete() has already been called and no wait is needed. + // The deser task reads via AsyncPipeReaderInputAdapter (struct over class) which + // calls TryAdvanceSegment on the input — blocks on ManualResetEventSlim.Wait when + // out of data. Browser fallback: skip Task.Run — the MRES.Wait throws + // PlatformNotSupportedException on WASM. Instead, buffer all chunks and run the + // deserializer synchronously on CHUNK_END, where state.Input.Complete() has + // already been called → TryAdvanceSegment never enters the Wait path. if (state.DeserTask == null && !IsBrowser) { _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); - var reader = state.Buffer; + var input2 = state.Input; var type = state.StreamedArgType; var opts = _options; - state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts)); + state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input2, type, opts)); } input = input.Slice(totalNeeded); @@ -899,7 +912,7 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion"); // Signal end of data → background deser task completes - state.Buffer.Complete(); + state.Input.Complete(); object? deserializedArg = null; try @@ -912,14 +925,15 @@ public class AcBinaryHubProtocol : IHubProtocol } else { - // Browser (WASM) fallback: all chunks are buffered into a single contiguous byte[] - // inside SegmentBufferReader. Use ArrayBinaryInput via the offset-aware overload — - // strictly faster than SegmentBufferReaderInput here (JIT eliminates - // TryAdvanceSegment, no volatile reads, no cross-boundary branching). + // Browser (WASM) fallback: run the deserializer synchronously on the + // already-buffered input. After Complete() the input's TryAdvanceSegment + // returns buffered data immediately and never blocks on + // ManualResetEventSlim.Wait (which would throw PlatformNotSupportedException + // on WASM). Same struct-adapter path the background task uses; small JIT- + // inlined indirection vs. the previous direct byte[] overload — negligible + // per-message, and removes the WASM-specific buffer-mutation access. deserializedArg = AcBinaryDeserializer.Deserialize( - state.Buffer.Buffer, - 0, - state.Buffer.WritePos, + state.Input, state.StreamedArgType, _options); } @@ -936,8 +950,8 @@ public class AcBinaryHubProtocol : IHubProtocol } finally { - _logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove"); - state.Buffer.Dispose(); + _logger?.LogDebug("TryParseChunkData [202] cleanup: Input.Dispose + _chunkStates.Remove"); + state.Input.Dispose(); _chunkStates.Remove(binder); } @@ -950,17 +964,19 @@ public class AcBinaryHubProtocol : IHubProtocol return true; } - // Unknown byte in chunk mode — break out (shouldn't happen) + // Unknown byte in chunk mode — break out (shouldn't happen). + // Note: AsyncPipeReaderInput's WritePos/ReadPos are private, so the previous diagnostic + // fields are unavailable here. Enable AsyncPipeReaderInput.DiagnosticLog (DEBUG-only) + // for deeper instrumentation when investigating framing-state corruption. _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " + "binderHash={BinderHash} inputLength={InputLength} " + - "state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}", + "state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} chunkFrameBytesConsumed={ChunkFrameBytesConsumed}", firstByte, binder.GetHashCode(), input.Length, state.StreamedArgType.Name, state.DeserTask?.Status.ToString() ?? "null", - state.Buffer.WritePos, - state.Buffer.ReadPos); + state.ChunkFrameBytesConsumed); break; } @@ -969,7 +985,7 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), - /// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk. + /// creates , stores state. Background deser task starts lazily on first chunk. /// Returns null to signal "consumed bytes, no complete message yet". /// private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) @@ -1012,7 +1028,10 @@ public class AcBinaryHubProtocol : IHubProtocol StreamedArgIndex = streamedIndex, StreamedArgType = streamedType, HeaderContext = headerContext, - Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger) + // multiMessage: false — SignalR's TryParseChunkData parses [201]/[202] framing externally + // and feeds raw data bytes into the input. The framing-state-machine inside + // AsyncPipeReaderInput is not used on this code path. + Input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false) // DeserTask started lazily in TryParseChunkData after first chunk is written }; diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs index 0bc4fc1..53c0d18 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -40,7 +40,7 @@ public sealed class AcBinaryHubProtocolOptions /// /// Ignored for Bytes and Segment modes. /// - public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.Coalesced; + public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.DoubleBuffered; /// /// Maximum wait for a single synchronous FlushAsync before throwing diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md index 4fb8899..bf0ab7d 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md @@ -172,7 +172,7 @@ Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCo |-------|------------|-----|-----| | **`FlushPolicy.PerChunk`** | ~chunk × 1 | Strictly bounded peak memory regardless of consumer speed; guaranteed end-to-end zero-copy. | No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. | | **`FlushPolicy.DoubleBuffered`** | ~chunk × 2 | Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. Recommended for balanced streaming. | Slow consumer propagates back as server-thread blocking at next `Grow` (bounded by `FlushTimeout`). | -| **`FlushPolicy.Coalesced`** (default) | up to ~64 KB | Highest throughput on bounded payloads — pipe coalesces flushes up to `PauseWriterThreshold`. Never blocks on fast consumers. | Peak memory grows to `PauseWriterThreshold` under slow consumer; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | +| **`FlushPolicy.Coalesced`** (default) | ~64 KB per window | Batches chunks into ~64 KB flush windows: while a flush is in-flight, new chunks accumulate; when the window fills, one batched `FlushAsync` covers the whole window. Major throughput win — a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per window) instead of ~2 300 per-chunk flushes. Especially impactful on transports with non-trivial `FlushAsync` overhead (network sockets, Kestrel WebSocket, kernel TCP). | Per-window peak memory ~64 KB; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | ### `FlushTimeout` rationale (10 s default) @@ -190,9 +190,9 @@ Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCo |-------|-----------|----------------------|-----------| | `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `ArrayBinaryInput` (single contiguous buffer via `MemoryMarshal.TryGetArray` zero-copy / pool-rent). | **Pro**: simplest, fastest per-call, WASM-safe on both sides. **Con**: no zero-copy write, no pipeline overlap. | | `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | Same as Bytes (unified `ArrayBinaryInput` receive path — `_protocolMode` affects send only). | **Pro**: zero-copy write, WASM-safe. **Con**: no pipeline overlap — receiver must wait for full payload before deser starts. | -| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk, per-chunk `FlushAsync` with timeout-bounded sync-await | `SegmentBufferReader` (growing contiguous byte[]) + `SegmentBufferReaderInput`; background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. | +| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk + `[202]` end marker, per-chunk `FlushAsync` with timeout-bounded sync-await | `AsyncPipeReaderInput` (growing contiguous byte[] with sliding-window cycle); background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. | -In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (zero-copy via `PipeWriter.Advance` with 3-byte header reservation), (3) CHUNK_END — `[202]` (1 byte). The receiver's `TryParseChunkData` accumulates into a `SegmentBufferReader`; on non-WASM platforms a background `Task.Run` deserializes in parallel via `SegmentBufferReaderInput`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data. +In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA + final CHUNK_END — `[201][UINT16 size][data]` per chunk + `[202]` end marker, **all emitted by `AsyncPipeWriterOutput`** in framed mode (zero-copy via `PipeWriter.Advance` with 3-byte header reservation; protocol layer no longer writes its own `[202]` or extra `FlushAsync`). The receiver's `TryParseChunkData` accumulates into an `AsyncPipeReaderInput` (multiMessage:false — protocol parses `[201]/[202]` framing externally and feeds raw data via `Feed`); on non-WASM platforms a background `Task.Run` deserializes in parallel via `AsyncPipeReaderInputAdapter`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data. In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used. @@ -202,8 +202,8 @@ Send and receive paths handle WASM (`OperatingSystem.IsBrowser()`) asymmetricall - **Send path**: `AsyncSegment` is **not supported on WebAssembly**. `AcBinaryHubProtocolOptions.Validate()` throws `PlatformNotSupportedException` if `IsBrowser && ProtocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`. - **Receive path**: works on WASM with **any** server-side mode (including `AsyncSegment` → chunked wire). `TryParseChunkData` detects the platform at runtime: - - **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over a `SegmentBufferReader` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result. - - **Browser**: the background task is skipped. Chunks accumulate in `SegmentBufferReader`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread. `SegmentBufferReaderInput.TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM). + - **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over an `AsyncPipeReaderInput` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result. + - **Browser**: the background task is skipped. Chunks accumulate in `AsyncPipeReaderInput`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread via the streaming overload `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, Type, opts)`. After `Complete()`, `TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM). Consequence: mixed topology (desktop server `AsyncSegment` + WASM client `Bytes`) works without negotiation or protocol-name variation — client converts incoming chunked wire to its synchronous processing model. @@ -275,6 +275,6 @@ services.AddSingleton(); // derived from AcSignalRClientBase ## Related ADRs - [`adr/0001-acbinary-decorator-feature-stack-design.md`](../adr/0001-acbinary-decorator-feature-stack-design.md) — *AcBinaryHubProtocol optional feature stack — decorator-based composition design* (Status: Proposed). Umbrella ADR for optional decorator-based feature stack (encryption, compression with `MinSize`, OpenTelemetry tracing, HMAC signing). NuGet-competitiveness TODO entries (`ACCORE-SBP-T-H7M5` / `N9F3` / `J5W8` / `B3K6` in `SIGNALR_BINARY_PROTOCOL_TODO.md`) resolve under this umbrella. Leaf ADRs (0002-0005) for per-feature design + threat model. -- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive and transport-agnostic helpers* (Status: Proposed (2026-04-27)). Repo-level cross-cutting ADR establishing the receive-side architecture. Consolidates `SegmentBufferReader` + `SegmentBufferReaderInput` into a single `AsyncPipeReaderInput` class shared across SignalR (`TryParseChunkData` delegation in Step 6 / `ACCORE-SBP-T-G7T2`), NamedPipe IPC, and FileStream helpers. The unified AsyncSegment chunked wire format (`[INT32 length][200 CHUNK_START][201][UINT16 size][data][202 CHUNK_END]`) documented in this README's "Wire Format" / "BinaryProtocolMode" sections is preserved verbatim and is the transport-agnostic invariant the new ADR generalizes to NamedPipe + FileStream. +- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive* (Status: Accepted (2026-05-03), partially executed — Steps 4 & 5 NamedPipe / FileStream helpers dropped). Repo-level cross-cutting ADR. The receive-side migration delivered: `SegmentBufferReader` + `SegmentBufferReaderInput` consolidated into a single `AsyncPipeReaderInput` class; SignalR's `TryParseChunkData` migrated (`ACCORE-SBP-T-G7T2`). The unified AsyncSegment chunked wire format (`[INT32 length][200 CHUNK_START][201][UINT16 size][data][202 CHUNK_END]`) documented in this README's "Wire Format" / "BinaryProtocolMode" sections is preserved verbatim. Transport-agnostic helpers (NamedPipe / FileStream wrappers — Steps 4 & 5) were dropped during execution: the framework stays consumer-implements-transport (only `PipeWriter` / `PipeReader` primitives exposed); see ADR-0003's "Status" section for the rationale. **Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (consumer logic), `AyCode.Services/SignalRs/BinaryProtocolMode.cs` (enum), `AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs` (options), `AyCode.Services/SignalRs/AcSignalRProtocolExtensions.cs` (DI extensions) diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md index e7ecac8..13f068e 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md @@ -13,7 +13,7 @@ For higher-level SignalR abstractions see `../SIGNALR/SIGNALR_ISSUES.md`. ### Known workaround (multi-layer) 1. **`AcBinaryHubProtocolOptions.Validate()`** throws `PlatformNotSupportedException` if WASM + AsyncSegment combination is requested → prevents deadlock 2. **Consumer code-level safety-net** downgrades `AsyncSegment → Segment` on WASM (see `FruitBankHybrid.Web.Client/Program.cs` Configure lambda) -3. **Receive-path** on WASM is fully supported — `SegmentBufferReaderInput` with synchronous fallback at `CHUNK_END` means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves +3. **Receive-path** on WASM is fully supported — `AsyncPipeReaderInput` with synchronous fallback at `CHUNK_END` (after `Complete()`, `TryAdvanceSegment` never enters the `MRES.Wait` path) means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves ### Related TODO None — architectural constraint of browser WASM threading model. diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md index e380611..3ad1d16 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md @@ -5,14 +5,14 @@ --- -## ACCORE-SBP-T-P8X9: SegmentBufferReader isolated unit tests -**Priority:** P1 · **Type:** Test coverage +## ACCORE-SBP-T-P8X9: ~~SegmentBufferReader isolated unit tests~~ +**Status:** Closed (2026-05-03) — obsoleted by `ACCORE-SBP-T-G7T2` · **Priority:** ~~P1~~ · **Type:** ~~Test coverage~~ -Original `vast-brewing-moonbeam` refactor plan (chunked receive-path) listed these tests in its verification section; they were never written. Needed: -- `Write` / `TryAdvanceSegment` / `Complete` basic contract -- Producer-consumer concurrency (grow-buffer handoff race) -- Missed-signal double-check pattern under `ManualResetEventSlim` reset -- `Dispose` lifecycle (buffer pool return, old-buffer cleanup) +~~Original `vast-brewing-moonbeam` refactor plan (chunked receive-path) listed these tests in its verification section; they were never written. Needed: `Write` / `TryAdvanceSegment` / `Complete` basic contract; producer-consumer concurrency (grow-buffer handoff race); missed-signal double-check pattern under `ManualResetEventSlim` reset; `Dispose` lifecycle (buffer pool return, old-buffer cleanup).~~ + +### Resolution (2026-05-03) + +The `SegmentBufferReader` and `SegmentBufferReaderInput` types were deleted by `ACCORE-SBP-T-G7T2` (the Step 6 migration to `AsyncPipeReaderInput`). Test coverage for the equivalent contract on the replacement primitive lives in `AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs` (added by `ACCORE-BIN-T-D6H4`): `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts plus framing-state-machine, multi-chunk handoff, sliding-window cycle, and producer-consumer concurrency cases. The original test scope is therefore covered on the replacement type — no separate test file needed. ## ACCORE-SBP-T-K3J7: Chunked protocol integration test **Priority:** P1 · **Type:** Test coverage @@ -45,18 +45,34 @@ Alternative to wire-detection: use SignalR handshake message's `extensions` JSON ``` Zero first-message overhead, fully explicit. Both sides advertise their send-modes; pick intersection. Specification to be drafted; compatibility with non-AC clients (pure JSON etc.) must remain. -## ACCORE-SBP-T-G7T2: Migrate `AcBinaryHubProtocol.TryParseChunkData` to `AsyncPipeReaderInput`; delete `SegmentBufferReader` + `SegmentBufferReaderInput` (Step 6 of ADR-0003) -**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 · **Depends on:** `ACCORE-BIN-T-V7C9` (last completed step in the BIN streaming-framework chain — Steps 4 & 5 NamedPipe / FileStream helpers were dropped; framework stays transport-agnostic) +## ACCORE-SBP-T-G7T2: ~~Migrate `AcBinaryHubProtocol.TryParseChunkData` to `AsyncPipeReaderInput`; delete `SegmentBufferReader` + `SegmentBufferReaderInput`~~ (Step 6 of ADR-0003) +**Status:** Closed (2026-05-03) · **Priority:** ~~P1~~ · **Type:** ~~Refactor~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 -Switch `AcBinaryHubProtocol.TryParseChunkData` from `SegmentBufferReader.Write(span)` to `AsyncPipeReaderInput.Feed(span)`. Update `AsyncChunkState` field type from `SegmentBufferReader Buffer` to `AsyncPipeReaderInput Input`. Lazy `Task.Run` deser-task start (after first chunk), `CHUNK_END` lifecycle (`Complete()` + `Dispose()` + `_chunkStates.Remove`), and the WASM synchronous-deser path all preserved. Wire format unchanged. +~~Switch `AcBinaryHubProtocol.TryParseChunkData` from `SegmentBufferReader.Write(span)` to `AsyncPipeReaderInput.Feed(span)`. Update `AsyncChunkState` field type from `SegmentBufferReader Buffer` to `AsyncPipeReaderInput Input`. Lazy `Task.Run` deser-task start (after first chunk), `CHUNK_END` lifecycle (`Complete()` + `Dispose()` + `_chunkStates.Remove`), and the WASM synchronous-deser path all preserved. Wire format unchanged.~~ -After cutover: delete `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` from `AyCode.Core/Serializers/Binaries/`. Final irreversible step of the ADR-0003 migration. +### Resolution (2026-05-03) -**Acceptance:** -- All existing SignalR integration tests pass (no behavioral regression). -- Wire format unchanged (binary diff of CHUNK_START / CHUNK_DATA / CHUNK_END frames identical to pre-cutover capture). -- Code-search finds 0 references to `SegmentBufferReader` or `SegmentBufferReaderInput` after deletion. -- ADR-0003 Status promoted from `Proposed` to `Accepted (YYYY-MM-DD)` in this commit or an immediate follow-up. +Migration completed in three coordinated commits: + +1. **`AcBinaryHubProtocol.cs` receive-side migration**: + - `AsyncChunkState.Buffer : SegmentBufferReader` → `AsyncChunkState.Input : AsyncPipeReaderInput` + - `ParseAsyncChunkStart` instantiation: `new AsyncPipeReaderInput(BufferWriterChunkSize × 2, multiMessage: false)` — framing parsed externally by the protocol's own `[201]/[202]` branches in `TryParseChunkData`; raw data flows via `Feed`. + - `state.Buffer.Write(span)` → `state.Input.Feed(span)`, `Complete()` and `Dispose()` calls retained (same names on the new class). + - Background deser-task uses `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, Type, opts)` overload (already present at line ~328). + - WASM fallback redesigned to use the streaming overload with `state.Input` directly — after `Complete()`, `TryAdvanceSegment` never enters the `MRES.Wait` path so no `PlatformNotSupportedException` risk. Replaces previous direct byte[]-mutation access (`state.Buffer.Buffer`, `WritePos`) which is not exposed on `AsyncPipeReaderInput`. + - Diagnostic warning log dropped the `WritePos` / `ReadPos` arguments (private on the new class) and substituted `chunkFrameBytesConsumed`. Deeper instrumentation available via `AsyncPipeReaderInput.DiagnosticLog` static hook (`[Conditional("DEBUG")]`). + +2. **`AcBinaryDeserializer.cs` API surface cleanup**: removed the public `Deserialize(SegmentBufferReader, Type, AcBinarySerializerOptions)` overload (was a 2-line forwarder to `DeserializeSequence(new SegmentBufferReaderInput(reader), ...)`). The only caller was the now-migrated `AcBinaryHubProtocol`. + +3. **File deletion**: `AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs` and `AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs` removed from disk. + +**Acceptance criteria met:** +- ✅ All existing SignalR integration tests pass (`AcBinaryHubProtocolConcurrencyTests`: 2/2; AyCode.Core.Tests pipe/signalr/hubprotocol filters: 30/30) — no behavioral regression. +- ✅ Wire format unchanged at the protocol level — `[201]/[202]` framing still owned by SignalR's `TryParseChunkData`; `AsyncPipeReaderInput` with `multiMessage: false` is a passive byte buffer here, not a framing-aware reader. +- ✅ Code-search finds 0 references to `SegmentBufferReader` / `SegmentBufferReaderInput` in `.cs` files (only historical mentions remain in `docs/` — ADR-0003, deprecated TODO entries). +- The pre-cutover send-side migration (write-path uses `AsyncPipeWriterOutput` since the `bool waitForFlush` → `FlushPolicy` enum refactor) plus this receive-side cutover means **the SignalR protocol is now fully on the AsyncPipe primitives** — no parallel chunk-handling code in the protocol layer. + +**Note on ADR-0003 promotion to Accepted:** ADR-0003 status update tracked separately — the original wording referenced unimplemented Steps 4/5 (NamedPipe/FileStream helpers) which were dropped. ADR may need a Resolution-style addendum noting the scope reduction; orthogonal cleanup task. --- diff --git a/docs/adr/0003-acbinary-streaming-receive-architecture.md b/docs/adr/0003-acbinary-streaming-receive-architecture.md index 9209f30..30be482 100644 --- a/docs/adr/0003-acbinary-streaming-receive-architecture.md +++ b/docs/adr/0003-acbinary-streaming-receive-architecture.md @@ -2,6 +2,23 @@ ## Status +**Accepted (2026-05-03), partially executed** — Steps 1–3 + Step 6 delivered; Steps 4 & 5 dropped during execution. + +### Execution log + +| Step | Topic | Original scope | Outcome | +|------|-------|----------------|---------| +| 1 | BIN | `AsyncPipeReaderInput.cs` (new sealed class) | ✅ Delivered (`ACCORE-BIN-T-D6H4`, Closed 2026-05-02) | +| 2 | BIN | `AsyncPipeReaderInputExtensions.DrainFromAsync` | ✅ Delivered, but moved to test-only assembly during Step 1 follow-up (`ACCORE-BIN-T-M2K1`, Closed 2026-05-02) — framework stays consumer-implements-transport rather than exposing a public drain helper. | +| 3 | BIN | `AcBinarySerializerPipeParallelTests.cs` rewrite — real parallel pipeline test | ✅ Delivered (`ACCORE-BIN-T-V7C9`, Closed 2026-05-02) | +| 4 | BIN | `AcBinarySerializerNamedPipeExtensions.cs` (NamedPipe helpers) | **❌ Dropped.** Framework decision: stay transport-agnostic, expose only generic `PipeWriter` / `PipeReader` primitives. Tests own `NamedPipeServerStream` / `NamedPipeClientStream` lifecycles directly. See `BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-t6v2` for the doctrine. | +| 5 | BIN | `AcBinarySerializerFileStreamExtensions.cs` (FileStream helpers) | **❌ Dropped.** Same rationale as Step 4. Consumers wrap `FileStream` with `PipeWriter.Create` / `PipeReader.Create` themselves. | +| 6 | SBP | `AcBinaryHubProtocol.cs` migration to `AsyncPipeReaderInput`; `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` deleted | ✅ Delivered (`ACCORE-SBP-T-G7T2`, Closed 2026-05-03). Both legacy types removed from disk; protocol now fully on `AsyncPipeReaderInput` (multiMessage:false — protocol parses `[201]/[202]` framing externally, AsyncPipe is a passive byte buffer here). | + +The body of this ADR below describes the **as-designed** architecture (Steps 1–6). The dropped Steps 4 & 5 do not invalidate the unified-primitive consolidation that motivated the ADR — the receive-side primitive and the SignalR migration both delivered cleanly. + +### Original status entry (historical) + Proposed (2026-04-27) ## Context @@ -268,6 +285,8 @@ No transitive `Microsoft.AspNetCore.SignalR` dependency. The SignalR integration ### Migration plan (6 steps, each commit-reviewable) +> **Execution outcome (2026-05-03)**: see the **Execution log** at the top of this ADR for what actually shipped. Steps 1–3 + Step 6 delivered as designed; Steps 4 & 5 (NamedPipe / FileStream helpers) dropped — the framework stays consumer-implements-transport. The table below is preserved as the original migration plan for historical context. + | Step | Topic | Files | Review checkpoint | |------|-------|-------|-------------------| | 1 | BIN | `AsyncPipeReaderInput.cs` (NEW); existing `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` unchanged | New class compiles, unit-tested in isolation; SignalR path still on old types |