From 97ac3e21a3c6b9d06fbe06d7b6560525b5843c80 Mon Sep 17 00:00:00 2001 From: Loretta Date: Sun, 3 May 2026 15:21:15 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Remove SegmentBufferReader; unify on AsyncPipeReaderInput Migrates all SignalR chunked streaming receive logic to AsyncPipeReaderInput, fully removing SegmentBufferReader and SegmentBufferReaderInput from the codebase. Updates all references, deserialization paths, and documentation to reflect the new unified primitive. Marks ADR-0003 as accepted (partially executed), closes related TODOs, and clarifies protocol docs. Sets DoubleBuffered as the default FlushPolicy. No wire format or behavioral changes; all tests pass. --- .github/skills/docs-discovery/SKILL.md | 2 +- .../AyCode.Core.Serializers.Console.csproj | 41 ++-- AyCode.Core.Serializers.Console/Program.cs | 4 +- .../Binaries/AcBinaryDeserializer.cs | 9 - .../Binaries/AcBinarySerializer.cs | 22 +- .../Binaries/AsyncPipeReaderInput.cs | 7 +- .../Binaries/AsyncPipeReaderInputAdapter.cs | 6 +- .../Binaries/AsyncPipeWriterOutput.cs | 52 +++- .../Serializers/Binaries/FlushPolicy.cs | 27 ++- .../Binaries/SegmentBufferReader.cs | 228 ------------------ .../Binaries/SegmentBufferReaderInput.cs | 130 ---------- .../docs/BINARY/BINARY_ASYNCPIPE_TODO.md | 2 +- AyCode.Core/docs/BINARY/BINARY_TODO.md | 101 +++++++- AyCode.Core/docs/BINARY/BINARY_WRITERS.md | 2 +- AyCode.Core/docs/BINARY/README.md | 2 +- .../AcSignalRServerProtocolExtensions.cs | 7 +- .../SignalRs/AcWebSignalRHubBase.cs | 2 + .../SignalRs/AcBinaryHubProtocol.cs | 117 +++++---- .../SignalRs/AcBinaryHubProtocolOptions.cs | 2 +- .../docs/SIGNALR_BINARY_PROTOCOL/README.md | 12 +- .../SIGNALR_BINARY_PROTOCOL_ISSUES.md | 2 +- .../SIGNALR_BINARY_PROTOCOL_TODO.md | 48 ++-- ...acbinary-streaming-receive-architecture.md | 19 ++ 23 files changed, 343 insertions(+), 501 deletions(-) delete mode 100644 AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs delete mode 100644 AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs diff --git a/.github/skills/docs-discovery/SKILL.md b/.github/skills/docs-discovery/SKILL.md index 27eff59..cf1e5bc 100644 --- a/.github/skills/docs-discovery/SKILL.md +++ b/.github/skills/docs-discovery/SKILL.md @@ -18,7 +18,7 @@ This skill READS `.md` files and updates the LLM's `[LOADED_DOCS: ...]` state. I Parse the user's most recent message (and the wider conversation tail if relevant) for concrete concepts. Examples: -- Class / type names: `AcLoggerBase`, `SegmentBufferReader`, `AcBinaryHubProtocol`, `SignalRClient` (any derived/consumer-specific type) +- Class / type names: `AcLoggerBase`, `AsyncPipeReaderInput`, `AcBinaryHubProtocol`, `SignalRClient` (any derived/consumer-specific type) - Feature areas: "logger", "log writer", "serializer", "SignalR", "hub protocol", "chunked framing", "connection builder", "options" - File hints: `Program.cs`, `AcLoggerBase.cs`, `SIGNALR.md` - Patterns / idioms: "DI factory", "appsettings", "mode negotiation" diff --git a/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj b/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj index 0f900d8..39a0534 100644 --- a/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj +++ b/AyCode.Core.Serializers.Console/AyCode.Core.Serializers.Console.csproj @@ -1,21 +1,32 @@  - - - - + + + + - - - - - + + + + + - - Exe - net9.0 - enable - enable - + + Exe + net9.0 + enable + enable + + + true + true + + + + true + false + + true + diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index fa461e6..b756403 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -557,7 +557,9 @@ public static class Program new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"), // Fastest Byte[] — Runtime path (UseGeneratedCode=false). Same wire/options, no source-generated dispatch. // Always paired with the SGen variant so every layer can compare the SGen speed-up apples-to-apples. - new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"), + // COMMENTED: Reflection.Emit-based dispatch crashes under NativeAOT (PlatformNotSupportedException). + // Re-enable for JIT-mode benchmarks where SGen-vs-Runtime delta matters. + //new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"), // Default preset Byte[] — RefHandling=OnlyId (deduplicates IId-shared references on the wire) + // UseStringInterning=All (deduplicates repeated strings). Showcases the Default preset's wire-size // and CPU trade-off vs FastMode on the ~20% IId-ref / repeated-string test data. diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index d759ff7..68a103a 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -284,15 +284,6 @@ public static partial class AcBinaryDeserializer return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); } - /// - /// Deserialize from a with streaming pipeline parallelism. - /// The producer thread writes chunk data via , - /// while this method (running on a background thread) deserializes incrementally, - /// blocking on when data is exhausted. - /// - public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options) - => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options); - /// /// Deserialize from an with streaming pipeline parallelism. /// The producer thread feeds chunk data via , diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 3b59c93..d01194d 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -555,10 +555,24 @@ public static partial class AcBinarySerializer { if (value == null) { - // Null: write directly, no chunking needed - var span = pipeWriter.GetSpan(1); - span[0] = BinaryTypeCode.Null; - pipeWriter.Advance(1); + if (!multiMessage) + { + // Raw single-message mode: null is just a [BinaryTypeCode.Null] byte on the wire. + // No chunking needed, no [201]/[202] framing. + var span = pipeWriter.GetSpan(1); + span[0] = BinaryTypeCode.Null; + pipeWriter.Advance(1); + return 1; + } + + // Framed mode (multiMessage=true): null still needs to flow through AsyncPipeWriterOutput + // so the wire is [201][UINT16=1][BinaryTypeCode.Null][202] — well-formed chunked frame the + // receiver can parse. Bypassing AsyncPipeWriterOutput here would emit a bare [Null] byte that + // breaks framing for any chunked-stream consumer (e.g. AcBinaryHubProtocol AsyncSegment). + var nullOutput = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage: true, flushPolicy, flushTimeout); + nullOutput.Initialize(out var nullBuf, out var nullPos, out _); + nullBuf[nullPos++] = BinaryTypeCode.Null; + nullOutput.Flush(nullBuf, nullPos); return 1; } diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index f251dfc..3845f36 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -6,10 +6,9 @@ namespace AyCode.Core.Serializers.Binaries; /// /// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization. -/// -/// Self-contained implementation that consolidates the legacy -/// SegmentBufferReader + SegmentBufferReaderInput pair into a single sealed class -/// (see ADR-0003 at docs/adr/0003-acbinary-streaming-receive-architecture.md). +/// Self-contained implementation — universal receive-side primitive +/// for the AcBinary streaming framework (see ADR-0003 at +/// docs/adr/0003-acbinary-streaming-receive-architecture.md). /// /// The naming mirrors the send-side AsyncPipeWriterOutput primitive — both follow the /// .NET BCL convention for type-level Async prefix (AsyncEnumerable, diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs index 2b1417e..3a4296f 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputAdapter.cs @@ -29,9 +29,9 @@ namespace AyCode.Core.Serializers.Binaries; /// Why not relax the deserializer's struct constraint? 16+ generic declarations /// rely on it (DeserializeSequence, TypeReaderTable, /// DeserializationContextPool chain). Relaxing would require a wide refactor with risk -/// to the existing perf-critical struct paths (SegmentBufferReaderInput, -/// ArrayBinaryInput, SequenceBinaryInput). The ~2 ns per call savings is -/// sub-picosecond per byte at typical chunk sizes — not worth the blast radius. +/// to the existing perf-critical struct paths (ArrayBinaryInput, +/// SequenceBinaryInput). The ~2 ns per call savings is sub-picosecond per byte at +/// typical chunk sizes — not worth the blast radius. /// internal readonly struct AsyncPipeReaderInputAdapter : IBinaryInputBase { diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index e47d584..302e883 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -40,11 +40,13 @@ namespace AyCode.Core.Serializers.Binaries; /// Grow waits only if previous flush hasn't completed. Peak memory: ~chunk_size × 2. /// Pro: max producer/flush parallelism with bounded memory. /// Con: slow consumer blocks producer at next Grow (bounded by flushTimeout). -/// : Grow() never awaits; Pipe coalesces flushes up to -/// PauseWriterThreshold (~64 KB). Peak memory: up to PauseWriterThreshold. -/// Pro: highest throughput on bounded payloads. -/// Con: peak memory unbounded by chunk_size; under heavy backpressure may fall back to -/// an owned buffer, losing zero-copy for that chunk. +/// : chunks accumulate while a flush is in-flight; on +/// the per-window safety threshold (~64 KB), the producer waits for the prior flush, then fires +/// one batched flush covering the whole window and resets the window-counter. Peak memory: +/// ~64 KB per window. Pro: dramatically fewer FlushAsync syscalls (one per ~64 KB window +/// instead of one per chunk) → major throughput win on transports with non-trivial flush overhead. +/// Con: per-window peak ~64 KB; under heavy backpressure may fall back to an owned buffer, +/// losing zero-copy for that chunk. /// /// /// Flush strategy auto-selects on writer type — Stream-backed PipeWriters @@ -119,7 +121,15 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private readonly FlushPolicy _flushPolicy; private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; + // Cumulative total of data bytes committed since Initialize. Used by GetTotalPosition for the + // Position property and the Serialize* methods' return value. Never resets mid-Serialize. private int _committedBytes; + // Window-local counter of data bytes committed since the last FlushAsync was fired (Coalesced + // mode safety-net). Resets when a new FlushAsync starts, so each flush window starts from 0 + // and the safety-net trip-point (~64 KB) bounds peak in-flight buffer per window — not the + // total payload size. Without this, Coalesced would degrade to per-chunk-sync behavior after + // the first ~15 chunks of any multi-MB payload. + private int _unflushedBytes; private int _currentChunkStart; // Whether the current chunk's buffer is the owned ArrayPool fallback (true) or a zero-copy // PipeWriter slab (false). Used by CommitCurrentChunk to pick the commit strategy. @@ -180,6 +190,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter); _committedBytes = 0; + _unflushedBytes = 0; _hasOwnedBuffer = false; _ownedBuffer = null; _lastFlush = default; @@ -215,6 +226,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; + _unflushedBytes = 0; _lastFlush = default; AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); @@ -247,16 +259,28 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase // the next chunk into the PipeWriter's buffer concurrently with the background FlushAsync. // FlushPolicy.DoubleBuffered: wait at next Grow if the previous parallel flush hasn't // completed yet → max two chunks in flight (~chunk_size × 2 peak memory). - // FlushPolicy.Coalesced: skip the per-chunk wait; only block when _committedBytes - // approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer - // growth under a slow consumer. + // FlushPolicy.Coalesced: skip the per-chunk wait — bytes accumulate in the PipeWriter's + // buffer until _unflushedBytes approaches the Pipe's PauseWriterThreshold (~64 KB). + // At that point we wait for the prior flush, then fire ONE flush that batches the + // entire window. After firing, _unflushedBytes resets so the next window can begin. + // This produces ~64 KB-sized flush windows instead of per-chunk flushes — fewer + // syscalls, better network throughput. // The conditional FlushAsync at the end avoids double-flush if the previous flush - // is still in progress (Coalesced skip path). - if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); + // is still in progress (Coalesced skip path — keep accumulating). + if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _unflushedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); - if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync(); + if (_lastFlush.IsCompleted) + { + // The accumulated unflushed bytes are about to be moved into this flush — reset the + // window-counter so the next flush window starts fresh. Without this reset, the + // safety-net would trip permanently after the first ~15 chunks of a multi-MB payload, + // degrading Coalesced to per-chunk-sync behavior. _committedBytes (cumulative total + // for GetTotalPosition) is intentionally unchanged — only the window-counter resets. + _unflushedBytes = 0; + _lastFlush = _pipeWriter.FlushAsync(); + } } // Acquire new chunk with header reservation (common to both paths). @@ -378,7 +402,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase else _pipeWriter.Advance(dataBytes); } - _committedBytes += dataBytes; // only count data bytes, not framing + // Both counters track only data bytes (not framing). _committedBytes is the cumulative + // total used by GetTotalPosition; _unflushedBytes is the per-window counter used by the + // Coalesced safety-net (resets each time a new FlushAsync starts). + _committedBytes += dataBytes; + _unflushedBytes += dataBytes; } /// diff --git a/AyCode.Core/Serializers/Binaries/FlushPolicy.cs b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs index cbf665f..ac11821 100644 --- a/AyCode.Core/Serializers/Binaries/FlushPolicy.cs +++ b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs @@ -44,19 +44,24 @@ public enum FlushPolicy DoubleBuffered, /// - /// Coalesced — producer never awaits per-chunk flushes. The underlying Pipe coalesces - /// flushes adaptively up to its PauseWriterThreshold (~64 KB committed bytes by default). - /// Producer continues serializing in parallel with the consumer's drain; if the consumer - /// catches up earlier, the pipe flushes sooner and the producer keeps going without waiting. - /// Peak memory: grows up to PauseWriterThreshold (~64 KB) under slow - /// consumer; close to chunk_size × 2 under fast consumer. - /// Pro: highest throughput on bounded payloads; never blocks for fast consumers; - /// pipe-managed adaptive backpressure kicks in only when actually needed. - /// Con: peak memory unbounded by chunk_size — grows to PauseWriterThreshold under - /// slow-consumer conditions; under heavy backpressure may fall back to an owned buffer + /// Coalesced — producer batches up to ~64 KB worth of chunks into a single flush window. + /// While a previous FlushAsync is in-flight, new chunks accumulate in the + /// 's buffer without firing additional flushes. + /// When the per-window counter approaches the safety threshold (~64 KB), the producer waits + /// for the in-flight flush to complete, then fires one batched flush covering the + /// entire window. After firing, the window-counter resets and the next window begins. + /// Peak memory: bounded per-window by ~64 KB (the in-flight chunks during one + /// flush window). Total peak in process ≤ chunk_size × 2 + ~64 KB pipe-internal accumulation. + /// Pro: dramatically fewer FlushAsync syscalls compared to per-chunk modes — + /// e.g. a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per ~64 KB window) instead of + /// ~2 300 flushes (one per chunk). Major throughput win on transports where each FlushAsync + /// has non-trivial overhead (network sockets, Kestrel WebSocket, kernel TCP buffers). + /// Con: per-window peak memory ~64 KB (vs chunk_size × 2 in + /// ); under heavy backpressure may fall back to an owned buffer /// (losing zero-copy for that chunk). /// Recommended when payload size is known and bounded (REST request/response, fixed-size - /// IPC message), and the consumer is reliably fast. + /// IPC message), large enough to span multiple flush windows, and the consumer is reliably + /// fast enough to drain the pipe between windows. /// Coalesced } diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs deleted file mode 100644 index 3963d76..0000000 --- a/AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs +++ /dev/null @@ -1,228 +0,0 @@ -using System; -using System.Buffers; -using System.Diagnostics; -using System.Threading; -using Microsoft.Extensions.Logging; - -namespace AyCode.Core.Serializers.Binaries; - -/// -/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization. -/// -/// Replaces for the AsyncSegment read path: -/// the main thread writes incoming chunk data via , while a background -/// deserialization thread reads through which blocks -/// on when data is exhausted. -/// -/// Backed by a single contiguous byte[] from . -/// Positions reset to 0 when the consumer catches up (zero-cost reuse). -/// Grow is the absolute last resort (single Rent, practically never happens). -/// -/// Thread-safety: -/// -/// _writePos: written by producer (Volatile.Write), read by consumer (Volatile.Read). -/// _readPos: written by consumer (Volatile.Write), read by producer (Volatile.Read). -/// Reset-to-0 happens in only when _readPos == _writePos -/// (consumer is blocked in TryAdvanceSegment, not reading the buffer). -/// Grow happens in only when reset is insufficient -/// (consumer is behind). Old buffer kept for consumer's local reference; -/// picks up new buffer. -/// -/// -public sealed class SegmentBufferReader : IDisposable -{ - private byte[] _buffer; - private int _writePos; - private int _readPos; // consumer reports consumed position here - private bool _completed; - - private readonly ManualResetEventSlim _dataAvailable; - private readonly ILogger? _logger; - - // After grow: ALL old buffers are kept alive until Dispose. - // Cannot return them to the pool mid-operation because the consumer thread - // may hold a local reference to any of them (its local 'buffer' variable is - // only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind). - private byte[][]? _oldBuffers; - private int _oldBufferCount; - - /// - /// Creates a new SegmentBufferReader with the specified initial capacity. - /// Recommended: options.BufferWriterChunkSize * 2 (e.g. 8 KB for the SignalR-context 4 KB chunk size, - /// 128 KB for the standalone 64 KB default). See class remarks - /// for the full sizing rationale (two-chunks-worth headroom + reset-to-0 cycling). - /// - /// Initial buffer size. Rounded up by ArrayPool. - /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds. - public SegmentBufferReader(int initialCapacity, ILogger? logger = null) - { - if (initialCapacity <= 0) - throw new ArgumentOutOfRangeException(nameof(initialCapacity)); - - _buffer = ArrayPool.Shared.Rent(initialCapacity); - _dataAvailable = new ManualResetEventSlim(false); - _logger = logger; - } - - // --- Producer API (main thread) --- - - /// - /// Appends chunk data to the buffer. - /// Resets positions to 0 when consumer has caught up (zero-cost). - /// Grows as last resort if consumer is behind and buffer is full. - /// Signals the consumer thread that new data is available. - /// - public void Write(ReadOnlySpan data) - { - if (data.IsEmpty) return; - - // If consumer consumed everything → reset positions to 0 (zero-cost reuse) - var rp = Volatile.Read(ref _readPos); - if (rp > 0 && rp == _writePos) - { - DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos); - _writePos = 0; - Volatile.Write(ref _readPos, 0); - } - - // Grow if buffer can't fit the new data (rare — consumer typically keeps pace) - if (_writePos + data.Length > _buffer.Length) - { - DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}", - _writePos, data.Length, _buffer.Length); - Grow(_writePos + data.Length); - } - - data.CopyTo(_buffer.AsSpan(_writePos)); - var newWritePos = _writePos + data.Length; - Volatile.Write(ref _writePos, newWritePos); - _dataAvailable.Set(); - - DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}", - data.Length, newWritePos, Volatile.Read(ref _readPos)); - } - - /// - /// Signals that no more data will be written (CHUNK_END received). - /// The consumer's will return false - /// once all buffered data is consumed. - /// - public void Complete() - { - Volatile.Write(ref _completed, true); - _dataAvailable.Set(); - - DebugLog("Complete writePos={Wp} readPos={Rp}", - Volatile.Read(ref _writePos), Volatile.Read(ref _readPos)); - } - - // --- Consumer API (deser thread, used by SegmentBufferReaderInput) --- - - /// Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment. - internal byte[] Buffer => _buffer; - - /// Current write position. All bytes in [ReadPos..WritePos) are valid. - internal int WritePos => Volatile.Read(ref _writePos); - - /// Consumer's last reported read position. - internal int ReadPos => Volatile.Read(ref _readPos); - - /// True after is called. - internal bool IsCompleted => Volatile.Read(ref _completed); - - /// - /// Called by consumer to report how far it has read. - /// Enables the producer to reset positions to 0 when everything is consumed. - /// - internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position); - - /// Blocks until new data is written or is called. - internal void WaitForData() => _dataAvailable.Wait(); - - /// - /// Resets the signal for the double-check pattern: - /// ResetSignal() → check condition → if false, WaitForData(). - /// - internal void ResetSignal() => _dataAvailable.Reset(); - - // --- Lifecycle --- - - public void Dispose() - { - // Return all old buffers accumulated from grows - if (_oldBuffers != null) - { - for (var i = 0; i < _oldBufferCount; i++) - { - ArrayPool.Shared.Return(_oldBuffers[i]); - _oldBuffers[i] = null!; - } - _oldBuffers = null; - _oldBufferCount = 0; - } - - // Return current buffer - if (_buffer != null!) - { - ArrayPool.Shared.Return(_buffer); - _buffer = null!; - } - - _dataAvailable.Dispose(); - } - - // --- Internal --- - - private void Grow(int requiredCapacity) - { - var newSize = Math.Max(_buffer.Length * 2, requiredCapacity); - var newBuffer = ArrayPool.Shared.Rent(newSize); - System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos); - - // Keep the current buffer alive — consumer's local 'buffer' variable may still reference it - // (consumer may lag multiple grows behind before calling TryAdvanceSegment). - // Returning old buffers to the pool mid-operation would cause use-after-free - // if another pool user overwrites them while the consumer is still reading. - if (_oldBuffers == null) - _oldBuffers = new byte[4][]; - else if (_oldBufferCount == _oldBuffers.Length) - Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2); - - _oldBuffers[_oldBufferCount++] = _buffer; - _buffer = newBuffer; - } - - // --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) --- - - /// - /// Emits a debug log if the logger is attached and Debug level is enabled. - /// Compiled out entirely in RELEASE builds via . - /// - [Conditional("DEBUG")] - internal void DebugLog(string message) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + message); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0, object? arg1) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1); - } - - [Conditional("DEBUG")] - private void DebugLog(string template, object? arg0, object? arg1, object? arg2) - { - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2); - } -} diff --git a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs b/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs deleted file mode 100644 index 1763ed8..0000000 --- a/AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs +++ /dev/null @@ -1,130 +0,0 @@ -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Threading; - -namespace AyCode.Core.Serializers.Binaries; - -/// -/// Binary input that reads from a for chunked streaming deserialization. -/// -/// Replaces PipeReaderBinaryInput: instead of blocking on PipeReader.ReadAsync(), -/// blocks on when data is exhausted. Much simpler because -/// the buffer is a single contiguous byte[] — no multi-segment iteration, no cross-boundary -/// scratch buffers. -/// -/// The deserialization context's hot path reads directly from the buffer array using local -/// buffer/position/bufferLength variables. -/// is only called when position >= bufferLength (cold path), at which point it reports -/// the consumed position via , then either -/// provides more data or blocks until data arrives. -/// -/// Position reset: when the producer detects readPos == writePos (all consumed), -/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions. -/// -/// -/// Recommended initialCapacity: -/// options.BufferWriterChunkSize * 2 (typically 8 KB for the SignalR-context 4 KB chunk size, -/// 128 KB for the standalone 64 KB default). Sized to fit two chunks worth of in-flight bytes — -/// enough headroom for the producer to write the next chunk while the consumer is reading the -/// previous, with reset-to-0 cycling reusing the same buffer for the message's lifetime regardless -/// of total payload size. Larger values waste memory; smaller values trigger occasional grows -/// under burst-write conditions. -/// -/// -public struct SegmentBufferReaderInput : IBinaryInputBase -{ - private readonly SegmentBufferReader _reader; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public SegmentBufferReaderInput(SegmentBufferReader reader) - { - _reader = reader; - } - - /// - /// Provides the initial buffer state. Called once before deserialization begins. - /// Task.Run starts after the first Write() (lazy start in TryParseChunkData), - /// so data is already available — no wait needed. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Initialize(out byte[] buffer, out int position, out int bufferLength) - { - buffer = _reader.Buffer; - position = 0; - bufferLength = _reader.WritePos; - DebugLog("Input.Initialize bufferLength=" + bufferLength); - } - - /// - /// Called when the deserialization context needs more bytes than currently available. - /// Reports consumed position to the producer, then blocks via - /// until enough data arrives or completion is signaled. - /// - /// Uses the double-check pattern to avoid missed signals: - /// Reset() → check → if still not enough, Wait(). - /// - /// No cross-boundary handling needed — the buffer is a single contiguous byte[]. - /// After grow, re-reads _reader.Buffer to get the new (larger) array. - /// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions. - /// - [MethodImpl(MethodImplOptions.NoInlining)] - public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) - { - DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed); - - // Report how far we've consumed — enables producer to reset positions to 0 - _reader.SetReadPos(position); - - while (true) - { - // Re-read positions (may have been reset to 0 by producer) - int rp = _reader.ReadPos; - int wp = _reader.WritePos; - - if (wp - rp >= needed) - { - buffer = _reader.Buffer; // may be new array after grow - position = rp; // may be 0 after reset - bufferLength = wp; - DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength); - return true; - } - - if (_reader.IsCompleted) - { - // No more data will arrive. Return whatever is left. - if (wp > rp) - { - buffer = _reader.Buffer; - position = rp; - bufferLength = wp; - DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength); - return true; - } - DebugLog("Input.TryAdvanceSegment return false (completed, empty)"); - return false; // end of input - } - - // Double-check pattern: Reset → verify → Wait - _reader.ResetSignal(); - - rp = _reader.ReadPos; - wp = _reader.WritePos; - if (wp - rp >= needed || _reader.IsCompleted) - continue; // re-check from top - - DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")"); - _reader.WaitForData(); // ManualResetEventSlim.Wait() - DebugLog("Input.TryAdvanceSegment woke up"); - } - } - - [Conditional("DEBUG")] - private void DebugLog(string message) => _reader.DebugLog(message); - - /// - /// No-op. Buffer lifecycle is managed by . - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Release() { } -} diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md index dadaaeb..57c083d 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md @@ -203,7 +203,7 @@ Once registered, controllers using `[FromBody] T model` and `IActionResult` auto - **InputFormatter shape**: `AcBinaryInputFormatter : InputFormatter` with `SupportedMediaTypes.Add("application/x-acbinary")`. The `ReadRequestBodyAsync` reads from `context.HttpContext.Request.BodyReader` (PipeReader) — either drains to byte[] for simple cases (size-bounded by `[Request].MaxAllowedSize`), or uses the `AsyncPipeReaderInput` + drain-task pattern for low-memory streaming on huge payloads. Decide which is the default; expose the streaming variant as opt-in. -- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning for higher throughput at the cost of owned-buffer fallback risk. +- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning batches flushes into ~64 KB windows for higher throughput, at the cost of owned-buffer fallback risk under heavy backpressure. - **Wire-format choice**: raw chunked stream (`SerializeChunked`) is the natural fit for HTTP single-request-single-response. The multi-message framed variant (`SerializeChunkedFramed`) is over-engineered for REST — there's no concept of "next message on this stream" within a single HTTP request. Document this choice clearly. diff --git a/AyCode.Core/docs/BINARY/BINARY_TODO.md b/AyCode.Core/docs/BINARY/BINARY_TODO.md index f98d630..cbc5f65 100644 --- a/AyCode.Core/docs/BINARY/BINARY_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_TODO.md @@ -121,7 +121,7 @@ The property name `BufferWriterChunkSize` is misleading: across the three output | `ArrayBinaryOutput` (Byte[] API) | Initial buffer capacity of the internal `byte[]` | No | | `BufferWriterBinaryOutput` (IBufferWriter overload) | Internal buffer size — how much data accumulates before `Advance()` + new `GetMemory()` on the underlying writer | No | | `AsyncPipeWriterOutput` (streaming) | Both internal buffer **and** wire-format chunk frame size for chunked framing | **Yes** (only here) | -| Receive side (`AsyncPipeReaderInput`, `SegmentBufferReader[Input]`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) | +| Receive side (`AsyncPipeReaderInput`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) | Only the streaming `AsyncPipeWriterOutput` path has a wire-format "chunk" concept (chunked framing for length-prefixed segments). On the other 75% of paths the property name reads as if the serializer were segmenting the payload, which is not what happens. @@ -137,7 +137,7 @@ Pick one before touching code. Option 2 is the most correct but adds API surface - `AcBinarySerializerOptions.cs` (definition) - `AcBinarySerializer.cs` × 3 sites (`ArrayBinaryOutput` ctor, `BufferWriterBinaryOutput` ctor, `AsyncPipeWriterOutput` ctor) - `AcBinaryDeserializer.cs` × 1 site (receive-side initial capacity derivation) -- `AsyncPipeReaderInput.cs`, `SegmentBufferReader.cs`, `SegmentBufferReaderInput.cs` — XML doc cross-refs +- `AsyncPipeReaderInput.cs` — XML doc cross-refs - `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters) - Consumer-side: `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` ctor mutates `_options.BufferWriterChunkSize = options.BufferSize;` — see `BINARY_ISSUES.md#accore-bin-i-...` (struct-mutation context). Coordinate the rename with the struct-mutation fix to avoid two cross-cutting churn waves on the same property. @@ -263,6 +263,103 @@ This is honest — does not overclaim universal speed, does not hide the small ` - API doc-string contains a "When to use which mode?" decision matrix; explicitly compares with MemoryPack's pseudo-streaming. - `leaveOpen` parameter behaves per the System.Text.Json / MessagePack convention across all three modes. +## ACCORE-BIN-T-D7K4: Add `DeserializeAsync(Stream, T)` async overloads with mode-driven input strategy +**Priority:** P1 · **Type:** Feature · **Related:** [`ACCORE-BIN-T-T8K3`](#accore-bin-t-t8k3-add-serializeasyncstream-t-async-overloads-with-mode-driven-output-strategy) (companion write-side overload), `ACCORE-BIN-T-N9G6` (non-generic Type-based dispatch) + +Companion to `T8K3` on the receive side. The mainstream serializer ecosystem (System.Text.Json, MessagePack, Newtonsoft.Json, MemoryPack) all expose `DeserializeAsync(Stream)` — the symmetric counterpart of `SerializeAsync(Stream, T)`. **AcBinary's public API surface MUST include this overload** for parity; consumers expect a `Stream` parameter for receive paths (file load, HTTP response body, network stream) and don't navigate `PipeReader.Create(stream)` workarounds. Market-entry-blocking otherwise. + +### Implementation: zero new `IBinaryInputBase` impl needed + +The existing receive-side primitives cover the full strategy space via BCL `PipeReader.Create(stream)`: + +| Mode | Input strategy | Peak memory | Pipeline parallelism | Use when | +|---|---|---|---|---| +| **`Bytes`** (default) | `await stream.CopyToAsync(MemoryStream)` → `Deserialize(byte[])` (existing overload) | Full payload as `byte[]` (pooled) | No | Typical payloads (<10 MB), throughput-focus | +| **`Segment`** | `await PipeReader.Create(stream).ReadAsync()` → `Deserialize(ReadOnlySequence)` (existing overload) | PipeReader pause-threshold-bounded (~64 KB) | No | Mid-size payloads, no full byte[] alloc desired | +| **`AsyncSegment`** | `AsyncPipeReaderInput` + `DrainFromAsync(PipeReader.Create(stream))` + `Deserialize(input)` (existing overload) | Chunk-size-bounded (~8 KB) | Yes (producer drain Task in parallel with deser Task) | Very large payloads (>10 MB), memory-tight hosts | + +The `AcBinaryOutputMode` enum (introduced by `T8K3`) is symmetric — it controls deser-input strategy as well. The same enum value picks the matching read path. **No new `IBinaryInputBase` implementation needed** — the trio of existing inputs (`ArrayBinaryInput`, `SequenceBinaryInput`, `AsyncPipeReaderInput`) already cover all three modes; the new overload is a thin shim that wraps the `Stream` and routes to the right existing overload. + +### Public API shape + +```csharp +public static ValueTask DeserializeAsync( + Stream stream, + AcBinarySerializerOptions? options = null, + bool leaveOpen = false, + CancellationToken ct = default); + +// Non-generic Type-based variant (coordinated with N9G6): +public static ValueTask DeserializeAsync( + Stream stream, + Type targetType, + AcBinarySerializerOptions? options = null, + bool leaveOpen = false, + CancellationToken ct = default); +``` + +### Implementation outline (per mode) + +```csharp +// Bytes mode (default — simplest path, sub-LOH-friendly fast path): +public static async ValueTask DeserializeAsync_Bytes(Stream stream, ..., CancellationToken ct) +{ + var rented = ArrayPool.Shared.Rent((int)Math.Min(stream.CanSeek ? stream.Length : 4096, int.MaxValue)); + try + { + var totalRead = 0; + int read; + while ((read = await stream.ReadAsync(rented.AsMemory(totalRead), ct)) > 0) + { + totalRead += read; + if (totalRead == rented.Length) { /* grow rented */ } + } + return Deserialize(rented, 0, totalRead, options); + } + finally { ArrayPool.Shared.Return(rented); } +} + +// Segment mode (PipeReader.Create wrapping, then drain to ReadOnlySequence): +public static async ValueTask DeserializeAsync_Segment(Stream stream, ..., CancellationToken ct) +{ + var pipeReader = PipeReader.Create(stream, new(leaveOpen: leaveOpen)); + var result = await pipeReader.ReadAtLeastAsync(int.MaxValue, ct); // drain whole stream + var seq = result.Buffer; + var obj = Deserialize(seq, options); + pipeReader.AdvanceTo(seq.End); + await pipeReader.CompleteAsync(); + return obj; +} + +// AsyncSegment mode (chunked streaming pipeline, parallel drain + deser): +public static async ValueTask DeserializeAsync_AsyncSegment(Stream stream, ..., CancellationToken ct) +{ + using var input = new AsyncPipeReaderInput(options.BufferWriterChunkSize * 2, multiMessage: false); + var pipeReader = PipeReader.Create(stream, new(leaveOpen: leaveOpen)); + var deserTask = Task.Run(() => Deserialize(input, options), ct); + await input.DrainFromAsync(pipeReader, ct); + await pipeReader.CompleteAsync(); + return await deserTask; +} +``` + +### Honest performance positioning + +Symmetric to T8K3's analysis: + +- **`Bytes` mode**: simplest, single contiguous `byte[]` (pooled) → `Deserialize(byte[])`. Comparable to MemoryPack's `DeserializeAsync` (which does similar full-buffer-then-deser). **Best for typical workloads.** +- **`Segment` mode**: zero-copy from PipeReader's natural `ReadOnlySequence` — no extra byte[] allocation. **Best for mid-size payloads where allocation matters but pipeline overlap doesn't.** +- **`AsyncSegment` mode**: producer-drain Task and consumer-deser Task in parallel via `AsyncPipeReaderInput`. Wall-clock = max(network-drain, deser-CPU) + small overlap-cost. **Best for large payloads + slow transports** (network, mobile, satellite — where transit dominates and overlap pays). + +### Acceptance + +- `DeserializeAsync` round-trips against `SerializeAsync(Stream, T)` (`T8K3`) via `MemoryStream` in all three modes. +- Cancellation propagates correctly (`OperationCanceledException` on cancelled token mid-stream); partial-buffer state cleaned up; pooled byte[] returned even on cancellation. +- **Throughput matrix benchmark** (mirror of T8K3): 4 transports (`MemoryStream`, `FileStream`, `NamedPipeStream`, `NetworkStream`) × 3 modes × 3 payload sizes. Results documented in `Test_Benchmark_Results/Benchmark/DeserializeAsync_Stream_Modes.LLM`. +- **Memory-bounded benchmark**: 100 MB payload from `FileStream` in `AsyncSegment` mode → peak managed-heap delta ≤ 1 MB throughout. Same payload in `Bytes` mode → peak ~100 MB (expected, documented). +- API doc-string contains a "When to use which mode?" decision matrix; cross-references T8K3's symmetric write-side guidance. +- `leaveOpen` parameter behaves per the System.Text.Json / MessagePack convention across all three modes. + ## ACCORE-BIN-T-N9G6: Add non-generic `Type`-based `Serialize(object, Type, ...)` overloads **Priority:** P2 · **Type:** Feature · **Related:** `ACCORE-BIN-T-T8K3` diff --git a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md index 5c92750..1f45ae7 100644 --- a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md @@ -117,7 +117,7 @@ Constructor parameter `flushPolicy` of type `FlushPolicy` (default `FlushPolicy. - **`FlushPolicy.PerChunk`**: `Grow()` commits → flushes → awaits → acquires next chunk. Strictly bounded peak memory (~chunk_size × 1). No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. Auto-applied on Stream-backed PipeWriter regardless of policy. Recommended for memory-sensitive scenarios where payload size is unpredictable. - **`FlushPolicy.DoubleBuffered`** (default): `Grow()` is fire-and-forget for the previous flush; only blocks at the NEXT chunk's `Grow` if the previous flush hasn't completed. Peak memory ~chunk_size × 2 (current + previous overlapping). Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. The recommended balanced default for typical streaming. -- **`FlushPolicy.Coalesced`**: `Grow()` does not wait per-chunk. The underlying Pipe coalesces flushes adaptively up to its `PauseWriterThreshold` (~64 KB committed bytes). Highest serialization throughput on bounded payloads; memory grows to `PauseWriterThreshold` under slow-consumer conditions. +- **`FlushPolicy.Coalesced`**: `Grow()` does not wait per-chunk. While a previous `FlushAsync` is in-flight, new chunks accumulate in the `PipeWriter` buffer (a per-window counter `_unflushedBytes` tracks the accumulation). When the window approaches the safety threshold (~64 KB), the producer waits for the in-flight flush, then fires **one** batched `FlushAsync` covering the entire window — and the window-counter resets. This produces ~64 KB-sized flush windows instead of per-chunk flushes (e.g. a 9.5 MB payload at 4 KB chunks fires ~150 batched flushes, not ~2 300 per-chunk flushes). Major throughput win on transports where each `FlushAsync` has non-trivial overhead (network sockets, Kestrel WebSocket, kernel TCP buffers). Per-window peak memory ~64 KB (vs `chunk_size × 2` in `DoubleBuffered`); under heavy backpressure may fall back to an owned buffer, losing zero-copy for that chunk. In all three modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. diff --git a/AyCode.Core/docs/BINARY/README.md b/AyCode.Core/docs/BINARY/README.md index 688d186..541a9b3 100644 --- a/AyCode.Core/docs/BINARY/README.md +++ b/AyCode.Core/docs/BINARY/README.md @@ -27,4 +27,4 @@ Start with [`BINARY_FEATURES.md`](BINARY_FEATURES.md) (overview), then [`BINARY_ ## Related ADRs -- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — Receive-side streaming architecture (Status: Proposed 2026-04-27). Consolidates `SegmentBufferReader` + `SegmentBufferReaderInput` into a single `AsyncPipeReaderInput` primitive (mirrors send-side `AsyncPipeWriterOutput`); adds transport-agnostic helpers (NamedPipe + FileStream). Implementation: [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) Steps 1–5 + `SIGNALR_BINARY_PROTOCOL_TODO.md` Step 6. +- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — Receive-side streaming architecture (Status: Accepted 2026-05-03, partially executed). Delivered: `SegmentBufferReader` + `SegmentBufferReaderInput` consolidated into a single `AsyncPipeReaderInput` primitive (mirrors send-side `AsyncPipeWriterOutput`); SignalR receive-side migration completed. Dropped during execution: NamedPipe + FileStream helpers (Steps 4 & 5) — framework stays consumer-implements-transport, exposes only generic `PipeWriter` / `PipeReader` primitives. diff --git a/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs b/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs index 9662808..b0a846a 100644 --- a/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs +++ b/AyCode.Services.Server/SignalRs/AcSignalRServerProtocolExtensions.cs @@ -24,12 +24,9 @@ public static class AcSignalRServerProtocolExtensions /// The optional callback — overrides DI values inline /// /// - public static ISignalRServerBuilder AddAcBinaryProtocol( - this ISignalRServerBuilder builder, - Action? configure = null) + public static ISignalRServerBuilder AddAcBinaryProtocol(this ISignalRServerBuilder builder, Action? configure = null) { - builder.Services.AddSingleton(sp => - AcSignalRProtocolExtensions.BuildProtocol(sp, configure)); + builder.Services.AddSingleton(sp => AcSignalRProtocolExtensions.BuildProtocol(sp, configure)); return builder; } } diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index 99e36ed..2d732e7 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -52,7 +52,9 @@ public abstract class AcWebSignalRHubBase(IConfiguration public override async Task OnConnectedAsync() { InitDiagnosticLoggerIfNeeded(); + Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}"); + LogContextUserNameAndId(); await base.OnConnectedAsync(); } diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 5753236..c6952d0 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -64,9 +64,11 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Send path: AsyncSegment is unsupported (sync-over-async flush blocks the single UI thread). /// Receive path: when chunked wire arrives, background Task.Run is skipped; - /// the deserializer runs synchronously on CHUNK_END over the already-buffered data - /// ('s ManualResetEventSlim.Wait() would throw - /// ). + /// the deserializer runs synchronously on CHUNK_END over the already-buffered + /// . After Complete(), the input's + /// TryAdvanceSegment never blocks on ManualResetEventSlim.Wait() (which would + /// throw on WASM) — it returns buffered data + /// immediately and signals end-of-stream when exhausted. /// /// /// @@ -99,7 +101,7 @@ public class AcBinaryHubProtocol : IHubProtocol public object?[] Args = null!; public int StreamedArgIndex; public Type StreamedArgType = null!; - public SegmentBufferReader Buffer = null!; + public AsyncPipeReaderInput Input = null!; public Task? DeserTask; /// @@ -429,9 +431,22 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Writes a message using chunked protocol framing for AsyncSegment mode. - /// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg. - /// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy). - /// CHUNK_END: [202] (1 byte, no data — all data already committed by output). + /// The two phases: + /// + /// CHUNK_START envelope — standard SignalR framed message + /// ([INT32 length][200 marker][header][args except streamedArg as INT32 -1]), + /// written here via . + /// CHUNK_DATA + CHUNK_END — fully owned by + /// (invoked through ): + /// emits [201][UINT16 size][data] per chunk + [202] end marker + final + /// FlushAsync. This protocol layer no longer writes [201]/[202] + /// bytes or calls FlushAsync after the streamed-arg serialize — those are the + /// streaming primitive's responsibility (see BINARY_ASYNCPIPE docs). + /// + /// For streamedArg == null, still + /// drives in framed mode — wire is + /// [201][UINT16=1][Null][202], deserializing back to null. No special-casing + /// needed in this layer. /// private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter) { @@ -506,24 +521,20 @@ public class AcBinaryHubProtocol : IHubProtocol SyncFlush(pipeWriter.FlushAsync()); - // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- - if (streamedArg != null) - { - dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout); - _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); - } - - // --- CHUNK_END [202] --- - var endByte = pipeWriter.GetSpan(1); - endByte[0] = MsgAsyncChunkEnd; - pipeWriter.Advance(1); - - SyncFlush(pipeWriter.FlushAsync()); - - _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); + // --- CHUNK_DATA + CHUNK_END (fully delegated to AsyncPipeWriterOutput) --- + // AsyncPipeWriterOutput in framed mode owns the entire chunked-stream emission: + // - [201][UINT16 size][data] per chunk + // - [202] CHUNK_END marker + // - final FlushAsync + // This includes the null streamedArg case (since the AcBinarySerializer null-bypass for + // multiMessage=true was removed) — wire is [201][UINT16=1][Null][202], deserialized back to null. + // No manual [202] write or extra FlushAsync needed in this layer. + dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout); + _logger?.LogDebug("WriteMessageChunked CHUNK_DATA + CHUNK_END emitted via AsyncPipeWriterOutput dataBytes={DataBytes}", dataBytes); // Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1) - // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data + // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data. + // The +1 at the end is the [202] CHUNK_END marker (now written by AsyncPipeWriterOutput.Flush()). var chunkSize = _options.BufferWriterChunkSize; var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0; var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1; @@ -566,7 +577,7 @@ public class AcBinaryHubProtocol : IHubProtocol // the buffer may still contain: // 1. The already-processed CHUNK_START frame // 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously) - // Skip both to avoid duplicate writes to state.Buffer. + // Skip both to avoid duplicate writes to state.Input. if (TrySkipRepresentedChunkStart(ref input)) { _logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); @@ -864,29 +875,31 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); - // Write chunk data to SegmentBufferReader for background deserialization + // Feed chunk data into AsyncPipeReaderInput for background deserialization. + // Note: the input is multiMessage:false — we strip framing here and pass raw data. if (chunkDataSize > 0) { var dataSlice = input.Slice(3, chunkDataSize); foreach (var segment in dataSlice) - state.Buffer.Write(segment.Span); + state.Input.Feed(segment.Span); } // Lazy start: begin background deserialization after first chunk is written. - // SegmentBufferReaderInput.Initialize reads the already-written data immediately. - // Browser fallback: skip Task.Run — SegmentBufferReader.WaitForData relies on - // ManualResetEventSlim.Wait which throws PlatformNotSupportedException on WASM. - // Instead, buffer all chunks and run the deserializer synchronously on CHUNK_END, - // where state.Buffer.Complete() has already been called and no wait is needed. + // The deser task reads via AsyncPipeReaderInputAdapter (struct over class) which + // calls TryAdvanceSegment on the input — blocks on ManualResetEventSlim.Wait when + // out of data. Browser fallback: skip Task.Run — the MRES.Wait throws + // PlatformNotSupportedException on WASM. Instead, buffer all chunks and run the + // deserializer synchronously on CHUNK_END, where state.Input.Complete() has + // already been called → TryAdvanceSegment never enters the Wait path. if (state.DeserTask == null && !IsBrowser) { _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); - var reader = state.Buffer; + var input2 = state.Input; var type = state.StreamedArgType; var opts = _options; - state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts)); + state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input2, type, opts)); } input = input.Slice(totalNeeded); @@ -899,7 +912,7 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion"); // Signal end of data → background deser task completes - state.Buffer.Complete(); + state.Input.Complete(); object? deserializedArg = null; try @@ -912,14 +925,15 @@ public class AcBinaryHubProtocol : IHubProtocol } else { - // Browser (WASM) fallback: all chunks are buffered into a single contiguous byte[] - // inside SegmentBufferReader. Use ArrayBinaryInput via the offset-aware overload — - // strictly faster than SegmentBufferReaderInput here (JIT eliminates - // TryAdvanceSegment, no volatile reads, no cross-boundary branching). + // Browser (WASM) fallback: run the deserializer synchronously on the + // already-buffered input. After Complete() the input's TryAdvanceSegment + // returns buffered data immediately and never blocks on + // ManualResetEventSlim.Wait (which would throw PlatformNotSupportedException + // on WASM). Same struct-adapter path the background task uses; small JIT- + // inlined indirection vs. the previous direct byte[] overload — negligible + // per-message, and removes the WASM-specific buffer-mutation access. deserializedArg = AcBinaryDeserializer.Deserialize( - state.Buffer.Buffer, - 0, - state.Buffer.WritePos, + state.Input, state.StreamedArgType, _options); } @@ -936,8 +950,8 @@ public class AcBinaryHubProtocol : IHubProtocol } finally { - _logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove"); - state.Buffer.Dispose(); + _logger?.LogDebug("TryParseChunkData [202] cleanup: Input.Dispose + _chunkStates.Remove"); + state.Input.Dispose(); _chunkStates.Remove(binder); } @@ -950,17 +964,19 @@ public class AcBinaryHubProtocol : IHubProtocol return true; } - // Unknown byte in chunk mode — break out (shouldn't happen) + // Unknown byte in chunk mode — break out (shouldn't happen). + // Note: AsyncPipeReaderInput's WritePos/ReadPos are private, so the previous diagnostic + // fields are unavailable here. Enable AsyncPipeReaderInput.DiagnosticLog (DEBUG-only) + // for deeper instrumentation when investigating framing-state corruption. _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " + "binderHash={BinderHash} inputLength={InputLength} " + - "state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}", + "state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} chunkFrameBytesConsumed={ChunkFrameBytesConsumed}", firstByte, binder.GetHashCode(), input.Length, state.StreamedArgType.Name, state.DeserTask?.Status.ToString() ?? "null", - state.Buffer.WritePos, - state.Buffer.ReadPos); + state.ChunkFrameBytesConsumed); break; } @@ -969,7 +985,7 @@ public class AcBinaryHubProtocol : IHubProtocol /// /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), - /// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk. + /// creates , stores state. Background deser task starts lazily on first chunk. /// Returns null to signal "consumed bytes, no complete message yet". /// private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) @@ -1012,7 +1028,10 @@ public class AcBinaryHubProtocol : IHubProtocol StreamedArgIndex = streamedIndex, StreamedArgType = streamedType, HeaderContext = headerContext, - Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger) + // multiMessage: false — SignalR's TryParseChunkData parses [201]/[202] framing externally + // and feeds raw data bytes into the input. The framing-state-machine inside + // AsyncPipeReaderInput is not used on this code path. + Input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false) // DeserTask started lazily in TryParseChunkData after first chunk is written }; diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs index 0bc4fc1..53c0d18 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -40,7 +40,7 @@ public sealed class AcBinaryHubProtocolOptions /// /// Ignored for Bytes and Segment modes. /// - public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.Coalesced; + public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.DoubleBuffered; /// /// Maximum wait for a single synchronous FlushAsync before throwing diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md index 4fb8899..bf0ab7d 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md @@ -172,7 +172,7 @@ Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCo |-------|------------|-----|-----| | **`FlushPolicy.PerChunk`** | ~chunk × 1 | Strictly bounded peak memory regardless of consumer speed; guaranteed end-to-end zero-copy. | No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. | | **`FlushPolicy.DoubleBuffered`** | ~chunk × 2 | Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. Recommended for balanced streaming. | Slow consumer propagates back as server-thread blocking at next `Grow` (bounded by `FlushTimeout`). | -| **`FlushPolicy.Coalesced`** (default) | up to ~64 KB | Highest throughput on bounded payloads — pipe coalesces flushes up to `PauseWriterThreshold`. Never blocks on fast consumers. | Peak memory grows to `PauseWriterThreshold` under slow consumer; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | +| **`FlushPolicy.Coalesced`** (default) | ~64 KB per window | Batches chunks into ~64 KB flush windows: while a flush is in-flight, new chunks accumulate; when the window fills, one batched `FlushAsync` covers the whole window. Major throughput win — a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per window) instead of ~2 300 per-chunk flushes. Especially impactful on transports with non-trivial `FlushAsync` overhead (network sockets, Kestrel WebSocket, kernel TCP). | Per-window peak memory ~64 KB; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | ### `FlushTimeout` rationale (10 s default) @@ -190,9 +190,9 @@ Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCo |-------|-----------|----------------------|-----------| | `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `ArrayBinaryInput` (single contiguous buffer via `MemoryMarshal.TryGetArray` zero-copy / pool-rent). | **Pro**: simplest, fastest per-call, WASM-safe on both sides. **Con**: no zero-copy write, no pipeline overlap. | | `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | Same as Bytes (unified `ArrayBinaryInput` receive path — `_protocolMode` affects send only). | **Pro**: zero-copy write, WASM-safe. **Con**: no pipeline overlap — receiver must wait for full payload before deser starts. | -| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk, per-chunk `FlushAsync` with timeout-bounded sync-await | `SegmentBufferReader` (growing contiguous byte[]) + `SegmentBufferReaderInput`; background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. | +| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk + `[202]` end marker, per-chunk `FlushAsync` with timeout-bounded sync-await | `AsyncPipeReaderInput` (growing contiguous byte[] with sliding-window cycle); background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. | -In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (zero-copy via `PipeWriter.Advance` with 3-byte header reservation), (3) CHUNK_END — `[202]` (1 byte). The receiver's `TryParseChunkData` accumulates into a `SegmentBufferReader`; on non-WASM platforms a background `Task.Run` deserializes in parallel via `SegmentBufferReaderInput`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data. +In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA + final CHUNK_END — `[201][UINT16 size][data]` per chunk + `[202]` end marker, **all emitted by `AsyncPipeWriterOutput`** in framed mode (zero-copy via `PipeWriter.Advance` with 3-byte header reservation; protocol layer no longer writes its own `[202]` or extra `FlushAsync`). The receiver's `TryParseChunkData` accumulates into an `AsyncPipeReaderInput` (multiMessage:false — protocol parses `[201]/[202]` framing externally and feeds raw data via `Feed`); on non-WASM platforms a background `Task.Run` deserializes in parallel via `AsyncPipeReaderInputAdapter`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data. In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used. @@ -202,8 +202,8 @@ Send and receive paths handle WASM (`OperatingSystem.IsBrowser()`) asymmetricall - **Send path**: `AsyncSegment` is **not supported on WebAssembly**. `AcBinaryHubProtocolOptions.Validate()` throws `PlatformNotSupportedException` if `IsBrowser && ProtocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`. - **Receive path**: works on WASM with **any** server-side mode (including `AsyncSegment` → chunked wire). `TryParseChunkData` detects the platform at runtime: - - **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over a `SegmentBufferReader` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result. - - **Browser**: the background task is skipped. Chunks accumulate in `SegmentBufferReader`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread. `SegmentBufferReaderInput.TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM). + - **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over an `AsyncPipeReaderInput` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result. + - **Browser**: the background task is skipped. Chunks accumulate in `AsyncPipeReaderInput`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread via the streaming overload `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, Type, opts)`. After `Complete()`, `TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM). Consequence: mixed topology (desktop server `AsyncSegment` + WASM client `Bytes`) works without negotiation or protocol-name variation — client converts incoming chunked wire to its synchronous processing model. @@ -275,6 +275,6 @@ services.AddSingleton(); // derived from AcSignalRClientBase ## Related ADRs - [`adr/0001-acbinary-decorator-feature-stack-design.md`](../adr/0001-acbinary-decorator-feature-stack-design.md) — *AcBinaryHubProtocol optional feature stack — decorator-based composition design* (Status: Proposed). Umbrella ADR for optional decorator-based feature stack (encryption, compression with `MinSize`, OpenTelemetry tracing, HMAC signing). NuGet-competitiveness TODO entries (`ACCORE-SBP-T-H7M5` / `N9F3` / `J5W8` / `B3K6` in `SIGNALR_BINARY_PROTOCOL_TODO.md`) resolve under this umbrella. Leaf ADRs (0002-0005) for per-feature design + threat model. -- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive and transport-agnostic helpers* (Status: Proposed (2026-04-27)). Repo-level cross-cutting ADR establishing the receive-side architecture. Consolidates `SegmentBufferReader` + `SegmentBufferReaderInput` into a single `AsyncPipeReaderInput` class shared across SignalR (`TryParseChunkData` delegation in Step 6 / `ACCORE-SBP-T-G7T2`), NamedPipe IPC, and FileStream helpers. The unified AsyncSegment chunked wire format (`[INT32 length][200 CHUNK_START][201][UINT16 size][data][202 CHUNK_END]`) documented in this README's "Wire Format" / "BinaryProtocolMode" sections is preserved verbatim and is the transport-agnostic invariant the new ADR generalizes to NamedPipe + FileStream. +- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive* (Status: Accepted (2026-05-03), partially executed — Steps 4 & 5 NamedPipe / FileStream helpers dropped). Repo-level cross-cutting ADR. The receive-side migration delivered: `SegmentBufferReader` + `SegmentBufferReaderInput` consolidated into a single `AsyncPipeReaderInput` class; SignalR's `TryParseChunkData` migrated (`ACCORE-SBP-T-G7T2`). The unified AsyncSegment chunked wire format (`[INT32 length][200 CHUNK_START][201][UINT16 size][data][202 CHUNK_END]`) documented in this README's "Wire Format" / "BinaryProtocolMode" sections is preserved verbatim. Transport-agnostic helpers (NamedPipe / FileStream wrappers — Steps 4 & 5) were dropped during execution: the framework stays consumer-implements-transport (only `PipeWriter` / `PipeReader` primitives exposed); see ADR-0003's "Status" section for the rationale. **Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (consumer logic), `AyCode.Services/SignalRs/BinaryProtocolMode.cs` (enum), `AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs` (options), `AyCode.Services/SignalRs/AcSignalRProtocolExtensions.cs` (DI extensions) diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md index e7ecac8..13f068e 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_ISSUES.md @@ -13,7 +13,7 @@ For higher-level SignalR abstractions see `../SIGNALR/SIGNALR_ISSUES.md`. ### Known workaround (multi-layer) 1. **`AcBinaryHubProtocolOptions.Validate()`** throws `PlatformNotSupportedException` if WASM + AsyncSegment combination is requested → prevents deadlock 2. **Consumer code-level safety-net** downgrades `AsyncSegment → Segment` on WASM (see `FruitBankHybrid.Web.Client/Program.cs` Configure lambda) -3. **Receive-path** on WASM is fully supported — `SegmentBufferReaderInput` with synchronous fallback at `CHUNK_END` means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves +3. **Receive-path** on WASM is fully supported — `AsyncPipeReaderInput` with synchronous fallback at `CHUNK_END` (after `Complete()`, `TryAdvanceSegment` never enters the `MRES.Wait` path) means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves ### Related TODO None — architectural constraint of browser WASM threading model. diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md index e380611..3ad1d16 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md @@ -5,14 +5,14 @@ --- -## ACCORE-SBP-T-P8X9: SegmentBufferReader isolated unit tests -**Priority:** P1 · **Type:** Test coverage +## ACCORE-SBP-T-P8X9: ~~SegmentBufferReader isolated unit tests~~ +**Status:** Closed (2026-05-03) — obsoleted by `ACCORE-SBP-T-G7T2` · **Priority:** ~~P1~~ · **Type:** ~~Test coverage~~ -Original `vast-brewing-moonbeam` refactor plan (chunked receive-path) listed these tests in its verification section; they were never written. Needed: -- `Write` / `TryAdvanceSegment` / `Complete` basic contract -- Producer-consumer concurrency (grow-buffer handoff race) -- Missed-signal double-check pattern under `ManualResetEventSlim` reset -- `Dispose` lifecycle (buffer pool return, old-buffer cleanup) +~~Original `vast-brewing-moonbeam` refactor plan (chunked receive-path) listed these tests in its verification section; they were never written. Needed: `Write` / `TryAdvanceSegment` / `Complete` basic contract; producer-consumer concurrency (grow-buffer handoff race); missed-signal double-check pattern under `ManualResetEventSlim` reset; `Dispose` lifecycle (buffer pool return, old-buffer cleanup).~~ + +### Resolution (2026-05-03) + +The `SegmentBufferReader` and `SegmentBufferReaderInput` types were deleted by `ACCORE-SBP-T-G7T2` (the Step 6 migration to `AsyncPipeReaderInput`). Test coverage for the equivalent contract on the replacement primitive lives in `AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs` (added by `ACCORE-BIN-T-D6H4`): `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts plus framing-state-machine, multi-chunk handoff, sliding-window cycle, and producer-consumer concurrency cases. The original test scope is therefore covered on the replacement type — no separate test file needed. ## ACCORE-SBP-T-K3J7: Chunked protocol integration test **Priority:** P1 · **Type:** Test coverage @@ -45,18 +45,34 @@ Alternative to wire-detection: use SignalR handshake message's `extensions` JSON ``` Zero first-message overhead, fully explicit. Both sides advertise their send-modes; pick intersection. Specification to be drafted; compatibility with non-AC clients (pure JSON etc.) must remain. -## ACCORE-SBP-T-G7T2: Migrate `AcBinaryHubProtocol.TryParseChunkData` to `AsyncPipeReaderInput`; delete `SegmentBufferReader` + `SegmentBufferReaderInput` (Step 6 of ADR-0003) -**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 · **Depends on:** `ACCORE-BIN-T-V7C9` (last completed step in the BIN streaming-framework chain — Steps 4 & 5 NamedPipe / FileStream helpers were dropped; framework stays transport-agnostic) +## ACCORE-SBP-T-G7T2: ~~Migrate `AcBinaryHubProtocol.TryParseChunkData` to `AsyncPipeReaderInput`; delete `SegmentBufferReader` + `SegmentBufferReaderInput`~~ (Step 6 of ADR-0003) +**Status:** Closed (2026-05-03) · **Priority:** ~~P1~~ · **Type:** ~~Refactor~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 -Switch `AcBinaryHubProtocol.TryParseChunkData` from `SegmentBufferReader.Write(span)` to `AsyncPipeReaderInput.Feed(span)`. Update `AsyncChunkState` field type from `SegmentBufferReader Buffer` to `AsyncPipeReaderInput Input`. Lazy `Task.Run` deser-task start (after first chunk), `CHUNK_END` lifecycle (`Complete()` + `Dispose()` + `_chunkStates.Remove`), and the WASM synchronous-deser path all preserved. Wire format unchanged. +~~Switch `AcBinaryHubProtocol.TryParseChunkData` from `SegmentBufferReader.Write(span)` to `AsyncPipeReaderInput.Feed(span)`. Update `AsyncChunkState` field type from `SegmentBufferReader Buffer` to `AsyncPipeReaderInput Input`. Lazy `Task.Run` deser-task start (after first chunk), `CHUNK_END` lifecycle (`Complete()` + `Dispose()` + `_chunkStates.Remove`), and the WASM synchronous-deser path all preserved. Wire format unchanged.~~ -After cutover: delete `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` from `AyCode.Core/Serializers/Binaries/`. Final irreversible step of the ADR-0003 migration. +### Resolution (2026-05-03) -**Acceptance:** -- All existing SignalR integration tests pass (no behavioral regression). -- Wire format unchanged (binary diff of CHUNK_START / CHUNK_DATA / CHUNK_END frames identical to pre-cutover capture). -- Code-search finds 0 references to `SegmentBufferReader` or `SegmentBufferReaderInput` after deletion. -- ADR-0003 Status promoted from `Proposed` to `Accepted (YYYY-MM-DD)` in this commit or an immediate follow-up. +Migration completed in three coordinated commits: + +1. **`AcBinaryHubProtocol.cs` receive-side migration**: + - `AsyncChunkState.Buffer : SegmentBufferReader` → `AsyncChunkState.Input : AsyncPipeReaderInput` + - `ParseAsyncChunkStart` instantiation: `new AsyncPipeReaderInput(BufferWriterChunkSize × 2, multiMessage: false)` — framing parsed externally by the protocol's own `[201]/[202]` branches in `TryParseChunkData`; raw data flows via `Feed`. + - `state.Buffer.Write(span)` → `state.Input.Feed(span)`, `Complete()` and `Dispose()` calls retained (same names on the new class). + - Background deser-task uses `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, Type, opts)` overload (already present at line ~328). + - WASM fallback redesigned to use the streaming overload with `state.Input` directly — after `Complete()`, `TryAdvanceSegment` never enters the `MRES.Wait` path so no `PlatformNotSupportedException` risk. Replaces previous direct byte[]-mutation access (`state.Buffer.Buffer`, `WritePos`) which is not exposed on `AsyncPipeReaderInput`. + - Diagnostic warning log dropped the `WritePos` / `ReadPos` arguments (private on the new class) and substituted `chunkFrameBytesConsumed`. Deeper instrumentation available via `AsyncPipeReaderInput.DiagnosticLog` static hook (`[Conditional("DEBUG")]`). + +2. **`AcBinaryDeserializer.cs` API surface cleanup**: removed the public `Deserialize(SegmentBufferReader, Type, AcBinarySerializerOptions)` overload (was a 2-line forwarder to `DeserializeSequence(new SegmentBufferReaderInput(reader), ...)`). The only caller was the now-migrated `AcBinaryHubProtocol`. + +3. **File deletion**: `AyCode.Core/Serializers/Binaries/SegmentBufferReader.cs` and `AyCode.Core/Serializers/Binaries/SegmentBufferReaderInput.cs` removed from disk. + +**Acceptance criteria met:** +- ✅ All existing SignalR integration tests pass (`AcBinaryHubProtocolConcurrencyTests`: 2/2; AyCode.Core.Tests pipe/signalr/hubprotocol filters: 30/30) — no behavioral regression. +- ✅ Wire format unchanged at the protocol level — `[201]/[202]` framing still owned by SignalR's `TryParseChunkData`; `AsyncPipeReaderInput` with `multiMessage: false` is a passive byte buffer here, not a framing-aware reader. +- ✅ Code-search finds 0 references to `SegmentBufferReader` / `SegmentBufferReaderInput` in `.cs` files (only historical mentions remain in `docs/` — ADR-0003, deprecated TODO entries). +- The pre-cutover send-side migration (write-path uses `AsyncPipeWriterOutput` since the `bool waitForFlush` → `FlushPolicy` enum refactor) plus this receive-side cutover means **the SignalR protocol is now fully on the AsyncPipe primitives** — no parallel chunk-handling code in the protocol layer. + +**Note on ADR-0003 promotion to Accepted:** ADR-0003 status update tracked separately — the original wording referenced unimplemented Steps 4/5 (NamedPipe/FileStream helpers) which were dropped. ADR may need a Resolution-style addendum noting the scope reduction; orthogonal cleanup task. --- diff --git a/docs/adr/0003-acbinary-streaming-receive-architecture.md b/docs/adr/0003-acbinary-streaming-receive-architecture.md index 9209f30..30be482 100644 --- a/docs/adr/0003-acbinary-streaming-receive-architecture.md +++ b/docs/adr/0003-acbinary-streaming-receive-architecture.md @@ -2,6 +2,23 @@ ## Status +**Accepted (2026-05-03), partially executed** — Steps 1–3 + Step 6 delivered; Steps 4 & 5 dropped during execution. + +### Execution log + +| Step | Topic | Original scope | Outcome | +|------|-------|----------------|---------| +| 1 | BIN | `AsyncPipeReaderInput.cs` (new sealed class) | ✅ Delivered (`ACCORE-BIN-T-D6H4`, Closed 2026-05-02) | +| 2 | BIN | `AsyncPipeReaderInputExtensions.DrainFromAsync` | ✅ Delivered, but moved to test-only assembly during Step 1 follow-up (`ACCORE-BIN-T-M2K1`, Closed 2026-05-02) — framework stays consumer-implements-transport rather than exposing a public drain helper. | +| 3 | BIN | `AcBinarySerializerPipeParallelTests.cs` rewrite — real parallel pipeline test | ✅ Delivered (`ACCORE-BIN-T-V7C9`, Closed 2026-05-02) | +| 4 | BIN | `AcBinarySerializerNamedPipeExtensions.cs` (NamedPipe helpers) | **❌ Dropped.** Framework decision: stay transport-agnostic, expose only generic `PipeWriter` / `PipeReader` primitives. Tests own `NamedPipeServerStream` / `NamedPipeClientStream` lifecycles directly. See `BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-t6v2` for the doctrine. | +| 5 | BIN | `AcBinarySerializerFileStreamExtensions.cs` (FileStream helpers) | **❌ Dropped.** Same rationale as Step 4. Consumers wrap `FileStream` with `PipeWriter.Create` / `PipeReader.Create` themselves. | +| 6 | SBP | `AcBinaryHubProtocol.cs` migration to `AsyncPipeReaderInput`; `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` deleted | ✅ Delivered (`ACCORE-SBP-T-G7T2`, Closed 2026-05-03). Both legacy types removed from disk; protocol now fully on `AsyncPipeReaderInput` (multiMessage:false — protocol parses `[201]/[202]` framing externally, AsyncPipe is a passive byte buffer here). | + +The body of this ADR below describes the **as-designed** architecture (Steps 1–6). The dropped Steps 4 & 5 do not invalidate the unified-primitive consolidation that motivated the ADR — the receive-side primitive and the SignalR migration both delivered cleanly. + +### Original status entry (historical) + Proposed (2026-04-27) ## Context @@ -268,6 +285,8 @@ No transitive `Microsoft.AspNetCore.SignalR` dependency. The SignalR integration ### Migration plan (6 steps, each commit-reviewable) +> **Execution outcome (2026-05-03)**: see the **Execution log** at the top of this ADR for what actually shipped. Steps 1–3 + Step 6 delivered as designed; Steps 4 & 5 (NamedPipe / FileStream helpers) dropped — the framework stays consumer-implements-transport. The table below is preserved as the original migration plan for historical context. + | Step | Topic | Files | Review checkpoint | |------|-------|-------|-------------------| | 1 | BIN | `AsyncPipeReaderInput.cs` (NEW); existing `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` unchanged | New class compiles, unit-tested in isolation; SignalR path still on old types |