From e7b12a11008a723c5fa0e1e1a359706dfea4e67c Mon Sep 17 00:00:00 2001 From: Loretta Date: Sun, 3 May 2026 08:13:59 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Switch to FlushPolicy enum for streaming flush control Replaces the legacy bool waitForFlush with a new FlushPolicy enum (PerChunk, DoubleBuffered, Coalesced) across all binary streaming serialization APIs and SignalR protocol options. Updates all code, configuration, and documentation to use the new policy, clarifies memory/throughput trade-offs, and closes related TODOs. Stream-backed writers remain sequential; only parallel-capable Pipe-based writers honor the policy. --- AyCode.Core.Serializers.Console/Program.cs | 11 ++- .../AcBinarySerializerNamedPipeTests.cs | 10 +- .../Binaries/AcBinarySerializer.cs | 53 +++++----- .../Binaries/AsyncPipeWriterOutput.cs | 64 ++++++------ .../Serializers/Binaries/FlushPolicy.cs | 62 ++++++++++++ .../docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md | 18 +++- .../docs/BINARY/BINARY_ASYNCPIPE_TODO.md | 98 +++++++++++-------- AyCode.Core/docs/BINARY/BINARY_TODO.md | 59 ++++++++++- AyCode.Core/docs/BINARY/BINARY_WRITERS.md | 13 ++- .../SignalRs/AcBinaryHubProtocol.cs | 12 +-- .../SignalRs/AcBinaryHubProtocolOptions.cs | 21 ++-- .../docs/SIGNALR/SIGNALR_ISSUES.md | 2 +- AyCode.Services/docs/SIGNALR/SIGNALR_TODO.md | 2 +- .../docs/SIGNALR_BINARY_PROTOCOL/README.md | 13 +-- .../SIGNALR_BINARY_PROTOCOL_TODO.md | 2 +- 15 files changed, 304 insertions(+), 136 deletions(-) create mode 100644 AyCode.Core/Serializers/Binaries/FlushPolicy.cs diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index a879f29..fa461e6 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -63,7 +63,7 @@ public static class Program private const string IoNamedPipe = "NamedPipe"; private const string IoNamedPipeRaw = "NamedPipe"; private const string IoInMemoryPipe = "Pipe(in-mem)"; - private const string IoInMemoryRaw = "Bytes(in-mem)"; + private const string IoInMemoryRaw = "Pipe(in-mem)"; // Single source of truth for the chunk size used by ALL pipe-related benchmarks (NamedPipe PipeChunk, // NamedPipe PipeRaw, in-memory Pipe, in-memory RawMem) AND the NamedPipe server's inBufferSize/outBufferSize. @@ -1505,10 +1505,17 @@ public static class Program // Same 2-task streaming pipeline as NamedPipe variant — only the transport differs (in-memory Pipe // instead of kernel NamedPipe). Per-chunk SerializeChunkedFramed → PipeWriter slab → drain task // reads from PipeReader → input.Feed → consumer Deserialize consumes byte-by-byte. + // + // Uses the Pipe-overload (instead of the PipeWriter-overload) so the FlushPolicy parameter is + // exposed for tuning. Toggle between FlushPolicy.PerChunk (bounded peak memory, per-chunk await + // FlushAsync) and FlushPolicy.Coalesced (fire-and-forget per chunk, pipe-coalesced flushes up to + // PauseWriterThreshold ~64 KB) to A/B-test the streaming-pipeline overhead. FlushPolicy.PerChunk + // is functionally equivalent to the PipeWriter-overload (both internally route to + // SerializeToPipeWriterCore with FlushPolicy.PerChunk). _consumeDone.Reset(); _consumeRequest.Set(); - AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); + AcBinarySerializer.SerializeChunkedFramed(_order, _pipe, _options, FlushPolicy.Coalesced); _consumeDone.Wait(); } diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index cbef857..f03ebc0 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -7,8 +7,8 @@ using static AyCode.Core.Tests.TestModels.AcSerializerModels; namespace AyCode.Core.Tests.Serialization; /// -/// Cross-platform NamedPipe IPC roundtrip tests for AcBinarySerializer's transport-agnostic -/// streaming helpers (Step 4 of ADR-0003, ACCORE-BIN-T-A3T8). +/// Cross-platform NamedPipe IPC roundtrip tests proving AcBinarySerializer's streaming framework +/// works on arbitrary PipeWriter/PipeReader sources without per-transport adapters. /// /// The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers — /// the tests own the / @@ -16,9 +16,9 @@ namespace AyCode.Core.Tests.Serialization; /// + /// /// primitives, with the receive-side drain implemented via the test-only -/// extension. This proves the streaming -/// framework works on arbitrary PipeWriter/PipeReader sources (NamedPipe, FileStream, -/// NetworkStream, custom transports) without per-transport adapters in the framework. +/// extension. The same generic +/// primitives apply to FileStream / NetworkStream / custom transports — consumers own the +/// transport lifecycle, framework stays transport-agnostic. /// /// With BufferWriterChunkSize = 256, even small test payloads cross multiple chunk /// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior. diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 82c7949..3b59c93 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -434,26 +434,27 @@ public static partial class AcBinarySerializer /// Why instead of ? /// Pipe.Writer is always the BCL PipeWriterImpl, which is parallel-capable /// (no _tailMemory reset race like StreamPipeWriter). This overload exposes the - /// + tuning safely. + /// + tuning safely. /// /// The value to serialize; null writes a single null marker. /// Target pipe — caller drains pipe.Reader elsewhere. /// Serializer options (type wrappers, reference handling, interning, etc.). - /// - /// Per-chunk flush synchronization. true (default): maximum pipeline parallelism, - /// guaranteed zero-copy + zero-alloc, but slow consumers block the producer thread (bounded by - /// ). false: adaptive backpressure via memory threshold - /// (~64KB in-flight) — safer for mixed consumer speeds, never blocks on slow consumers. + /// + /// Per-chunk flush synchronization — see for the three trade-off + /// points. : strictly bounded ~chunk × 1 peak memory, no + /// producer/flush parallelism. (default): ~chunk × 2 + /// peak memory, max producer/flush parallelism. : up to + /// PauseWriterThreshold (~64 KB), highest throughput on bounded payloads. /// /// /// Per-flush timeout. null → wait forever. Positive value: throws /// on stuck consumers. /// /// Total serialized bytes written. - public static int SerializeChunked(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public static int SerializeChunked(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, FlushPolicy flushPolicy = FlushPolicy.DoubleBuffered, TimeSpan? flushTimeout = null) { if (pipe is null) throw new ArgumentNullException(nameof(pipe)); - return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, multiMessage: false); + return SerializeToPipeWriterCore(value, pipe.Writer, options, flushPolicy, flushTimeout, multiMessage: false); } /// @@ -465,11 +466,11 @@ public static partial class AcBinarySerializer /// (PipeWriter.Create(stream) — NamedPipe / FileStream / NetworkStream / etc.) runs /// sequentially per chunk because the BCL impl resets _tailMemory on flush completion /// (race-incompatible with parallel send). Other PipeWriter implementations (Kestrel transport, - /// custom impls) run with the safe waitForFlush=true default — max parallelism, zero-alloc. + /// custom impls) run with the safe default — max parallelism, zero-alloc. /// /// Need runtime tuning of the flush strategy? Build a /// instance and use - /// + /// /// — only Pipe-based writers can guarantee parallel-capable flush behavior. /// /// Need a multiplexed wire format with per-chunk frame headers? See @@ -480,7 +481,7 @@ public static partial class AcBinarySerializer /// Serializer options (type wrappers, reference handling, interning, etc.). /// Total serialized bytes written. public static int SerializeChunked(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) - => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, multiMessage: false); + => SerializeToPipeWriterCore(value, pipeWriter, options, FlushPolicy.DoubleBuffered, flushTimeout: null, multiMessage: false); /// /// Serialize a value into a chunked stream where each chunk carries a self-describing @@ -499,58 +500,58 @@ public static partial class AcBinarySerializer /// this exact wire format to interleave many HubMessages over a single connection. /// /// Need a simpler streaming output without per-chunk metadata? Use - /// + /// /// — bit-compatible with 's /// byte[] output, no extra parser needed on the receive side. /// /// The value to serialize; null writes a single null marker. /// Target pipe — caller drains pipe.Reader elsewhere. /// Serializer options. - /// See . - /// See . + /// See . + /// See . /// Total serialized data bytes (excluding framing overhead). - public static int SerializeChunkedFramed(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public static int SerializeChunkedFramed(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, FlushPolicy flushPolicy = FlushPolicy.DoubleBuffered, TimeSpan? flushTimeout = null) { if (pipe is null) throw new ArgumentNullException(nameof(pipe)); - return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, multiMessage: true); + return SerializeToPipeWriterCore(value, pipe.Writer, options, flushPolicy, flushTimeout, multiMessage: true); } /// /// Serialize to any with per-chunk frame headers /// (multiplexed wire format). See - /// + /// /// for the wire format details and use-cases. /// /// Flush strategy auto-selected by writer type — see /// . /// public static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) - => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, multiMessage: true); + => SerializeToPipeWriterCore(value, pipeWriter, options, FlushPolicy.DoubleBuffered, flushTimeout: null, multiMessage: true); /// /// Internal flush-tunable framed PipeWriter overload — used by AyCode.Services /// (SignalR hub protocol) on Kestrel transport output, which is parallel-capable. External /// callers should use the overload to safely tune - /// on a guaranteed parallel-capable writer. + /// on a guaranteed parallel-capable writer. /// - internal static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) - => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, multiMessage: true); + internal static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, FlushPolicy flushPolicy, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, flushPolicy, flushTimeout, multiMessage: true); /// - /// Internal legacy alias for + /// Internal legacy alias for /// — kept until the SignalR hub protocol (AcBinaryHubProtocol.cs) is migrated to the /// new name in a separate, isolated step. Identical behavior to SerializeChunkedFramed /// (framed wire format with [201][UINT16][data] per chunk + [202] end marker). /// - internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) - => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, multiMessage: true); + internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, FlushPolicy flushPolicy, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, flushPolicy, flushTimeout, multiMessage: true); /// /// Common pipe-output serialization core. Same loop for both raw () /// and framed () modes — the only difference flows through /// into the ctor. /// - private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool multiMessage) + private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, FlushPolicy flushPolicy, TimeSpan? flushTimeout, bool multiMessage) { if (value == null) { @@ -564,7 +565,7 @@ public static partial class AcBinarySerializer var runtimeType = value.GetType(); var context = BinarySerializationContextPool.Get(options); - context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage, waitForFlush, flushTimeout); + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage, flushPolicy, flushTimeout); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index d8320a2..e47d584 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -30,16 +30,21 @@ namespace AyCode.Core.Serializers.Binaries; /// and any custom multi-message protocol over a long-lived transport. /// /// -/// Backpressure modes (controlled by waitForFlush) — independent of framing: +/// Backpressure modes (controlled by ) — independent of framing: /// -/// waitForFlush=true (default): Grow() waits for the previous FlushAsync before -/// starting a new chunk. Pro: maximum pipeline parallelism, guaranteed end-to-end zero-copy. -/// Con: slow consumer propagates back as server-thread blocking (bounded by flushTimeout). -/// waitForFlush=false: Grow() is fire-and-forget per chunk; only blocks when committed -/// bytes exceed ~60 KB (memory threshold — itself an adaptive backpressure). -/// Pro: no per-chunk waits, safer with mixed consumer speeds. -/// Con: under heavy backpressure may fall back to an owned buffer, losing zero-copy -/// for that chunk. +/// : Grow() commits → flushes → awaits → acquires next. +/// Peak memory: ~chunk_size × 1. Pro: strictly bounded peak memory. +/// Con: no producer/flush parallelism. Auto-applied on Stream-backed PipeWriter +/// regardless of chosen policy. +/// (default): fire-and-forget previous flush; next +/// Grow waits only if previous flush hasn't completed. Peak memory: ~chunk_size × 2. +/// Pro: max producer/flush parallelism with bounded memory. +/// Con: slow consumer blocks producer at next Grow (bounded by flushTimeout). +/// : Grow() never awaits; Pipe coalesces flushes up to +/// PauseWriterThreshold (~64 KB). Peak memory: up to PauseWriterThreshold. +/// Pro: highest throughput on bounded payloads. +/// Con: peak memory unbounded by chunk_size; under heavy backpressure may fall back to +/// an owned buffer, losing zero-copy for that chunk. /// /// /// Flush strategy auto-selects on writer type — Stream-backed PipeWriters @@ -111,7 +116,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; private readonly bool _multiMessage; - private readonly bool _waitForFlush; + private readonly FlushPolicy _flushPolicy; private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; private int _committedBytes; @@ -144,10 +149,12 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// + [202] end-of-message marker on Flush. Receiver auto-resets between messages. /// false → single-message: raw AcBinary bytes only, byte-compatible with the single-shot byte[] output; /// caller signals end-of-message by closing the writer. See class summary. - /// See class summary — pipeline parallelism (true) vs adaptive (false). + /// See class summary — strictly bounded (), + /// double-buffered (, default), or coalesced + /// (). /// Per-flush timeout. null /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers. - public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool multiMessage = true, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool multiMessage = true, FlushPolicy flushPolicy = FlushPolicy.DoubleBuffered, TimeSpan? flushTimeout = null) { if (chunkSize > MaxChunkSize) throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); @@ -155,7 +162,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _pipeWriter = pipeWriter; _chunkSize = chunkSize; _multiMessage = multiMessage; - _waitForFlush = waitForFlush; + _flushPolicy = flushPolicy; // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. @@ -222,29 +229,30 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase [MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) { - if (_serializeFlushAndAcquire) + if (_serializeFlushAndAcquire || _flushPolicy == FlushPolicy.PerChunk) { - // STREAMPIPEWRITER path — sequential per chunk: commit → flush → await → acquire. - // Stream-backed writers (NamedPipe / FileStream / NetworkStream) reset internal - // state (_tailMemory) at flush completion → cannot acquire-during-flush concurrently - // (the standard Stream-PipeWriter usage pattern is await-flush-before-next-write). - // waitForFlush / _committedBytes throttling don't apply here — the writer pattern - // enforces sequentiality intrinsically. + // SEQUENTIAL path — fully synchronous per chunk: commit → flush → await → acquire. + // Triggered by either: + // - Stream-backed PipeWriter (NamedPipe / FileStream / NetworkStream / etc.) — auto, + // forced by the BCL StreamPipeWriter._tailMemory reset race. + // - FlushPolicy.PerChunk on a Pipe-based writer — explicit caller choice for + // strictly bounded peak memory (~chunk_size × 1). + // _committedBytes throttling doesn't apply — sequentiality is enforced per chunk. CommitCurrentChunk(buffer, position); SyncAwaitFlush(_pipeWriter.FlushAsync()); } else { - // PIPE-BASED path (Kestrel / SignalR) — parallel sender: serializer writes the next - // chunk into the PipeWriter's buffer concurrently with the background FlushAsync. - // waitForFlush=true: backpressure — wait for the previous parallel flush before - // starting a new one (prevents unbounded in-flight flushes). - // waitForFlush=false: adaptive — skip the wait, but force-await if _committedBytes + // PARALLEL paths (Pipe-based writer + DoubleBuffered or Coalesced) — serializer writes + // the next chunk into the PipeWriter's buffer concurrently with the background FlushAsync. + // FlushPolicy.DoubleBuffered: wait at next Grow if the previous parallel flush hasn't + // completed yet → max two chunks in flight (~chunk_size × 2 peak memory). + // FlushPolicy.Coalesced: skip the per-chunk wait; only block when _committedBytes // approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer - // growth when the consumer is slow. + // growth under a slow consumer. // The conditional FlushAsync at the end avoids double-flush if the previous flush - // is still in progress (waitForFlush=false skip path). - if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); + // is still in progress (Coalesced skip path). + if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); diff --git a/AyCode.Core/Serializers/Binaries/FlushPolicy.cs b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs new file mode 100644 index 0000000..cbf665f --- /dev/null +++ b/AyCode.Core/Serializers/Binaries/FlushPolicy.cs @@ -0,0 +1,62 @@ +namespace AyCode.Core.Serializers.Binaries; + +/// +/// Controls per-chunk flush synchronization on the parallel-capable Pipe-based send path +/// (). Replaces the historical bool waitForFlush +/// parameter with three explicit memory-vs-throughput trade-off points. +/// +/// Stream-backed PipeWriter exception +/// (NamedPipe, FileStream, NetworkStream, etc.) is intrinsically sequential per chunk because of +/// the StreamPipeWriter._tailMemory reset race, regardless of policy. The auto-detected +/// sequential path is functionally equivalent to . Only Pipe-based writers +/// (BCL PipeWriterImpl, Kestrel transport, custom parallel-capable impls) honour the policy +/// distinction between the three values. +/// +public enum FlushPolicy +{ + /// + /// Sequential — every chunk fully flushes and awaits completion before the next is acquired. + /// Producer-side: commit → FlushAsync → await → acquire-next. + /// Peak memory: ~chunk_size × 1 (one in-flight chunk). + /// Pro: strictly bounded peak memory regardless of consumer speed; simplest + /// memory profile to reason about. Auto-applied on Stream-backed PipeWriter regardless of + /// chosen policy. + /// Con: no producer/flush parallelism — wall-clock = sum of (serialize + flush) + /// per chunk. + /// Recommended for unpredictable / unbounded payloads where memory must stay strictly + /// minimal regardless of consumer behaviour. + /// + PerChunk, + + /// + /// Double-buffered — fire-and-forget the previous flush; block at the NEXT chunk's Grow + /// only if the previous flush hasn't completed yet. Allows two chunks in flight simultaneously: + /// the one being serialised + the one being flushed. + /// Peak memory: ~chunk_size × 2 (current + previous overlapping). + /// Pro: maximum producer/flush parallelism with bounded memory; serializer + /// continues with the next chunk while the previous one's flush completes in parallel. + /// Wall-clock = max(serialize, flush) × N_chunks instead of sum. + /// Con: a slow consumer propagates back as producer-thread blocking at the next + /// Grow (bounded by flushTimeout). + /// Recommended default for typical streaming scenarios — the best memory/throughput + /// trade-off when the payload is sized comparably to or larger than chunk_size. + /// + DoubleBuffered, + + /// + /// Coalesced — producer never awaits per-chunk flushes. The underlying Pipe coalesces + /// flushes adaptively up to its PauseWriterThreshold (~64 KB committed bytes by default). + /// Producer continues serializing in parallel with the consumer's drain; if the consumer + /// catches up earlier, the pipe flushes sooner and the producer keeps going without waiting. + /// Peak memory: grows up to PauseWriterThreshold (~64 KB) under slow + /// consumer; close to chunk_size × 2 under fast consumer. + /// Pro: highest throughput on bounded payloads; never blocks for fast consumers; + /// pipe-managed adaptive backpressure kicks in only when actually needed. + /// Con: peak memory unbounded by chunk_size — grows to PauseWriterThreshold under + /// slow-consumer conditions; under heavy backpressure may fall back to an owned buffer + /// (losing zero-copy for that chunk). + /// Recommended when payload size is known and bounded (REST request/response, fixed-size + /// IPC message), and the consumer is reliably fast. + /// + Coalesced +} diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md index 2f7a491..8cfd64f 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md @@ -146,7 +146,7 @@ The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` ### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream -**Status:** Open (intentional limit) +**Status:** Open (out of framework scope — consumer responsibility per CLAUDE.md Rule #7) **Affects:** `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options)` **Reach:** any consumer that wants a long-lived `AsyncPipeReaderInput` to receive **mixed-type messages** (e.g. `Request` then `Response` then `Heartbeat` on the same connection — the typical RPC/Hub pattern). @@ -154,10 +154,20 @@ The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` **Root cause:** AcBinary's wire format does **not** include a type-discriminator before the payload. The serializer writes the object graph directly, and the deserializer must know the target type up-front. This is by design — the format is optimized for size, not for self-description. -**Why intentional:** Type discriminators (4-byte hash, length-prefixed type-name string, etc.) cost wire bytes per message and require shared registries between producer and consumer. The framework keeps these concerns out of the AcBinary core and pushes them to the **dispatch layer** above (where they can be application-tuned: short tags, hash maps, type-id enums). +**Why this stays out of scope (firm framework doctrine):** -**Workarounds:** -- **Tag-based dispatch above AcBinary**: prefix each message with an `int` (or enum) tag the consumer reads first to choose `Deserialize`. The consumer encapsulates the tag-read + type-dispatch in its own deserialization wrapper. +The framework explicitly does NOT provide type-dispatch infrastructure (no wire-format type-id, no registry, no dispatcher, no handshake). This aligns with: + +1. **CLAUDE.md Rule #7** — *"Tag-based transport... Request types are conventionally identified by `int` tags"* — dispatch is the consumer's concern, not the serializer's. +2. **Framework-First Design Principle** — *"Generic (reusable across any consumer)? → belongs HERE. Consumer-specific (business logic, ...)? → NOT HERE"* — type-dispatch policy (which types are registered, which handler runs for them) is inherently consumer-aware. +3. **Industry precedent** — Protobuf, MessagePack, MemoryPack, System.Text.Json, Cap'n Proto all keep their core as a pure (T ↔ bytes) serializer; type-dispatch sits in the consumer/RPC layer above (gRPC service methods, MagicOnion handlers, ASP.NET routing, SignalR `IInvocationBinder`, etc.). +4. **Existing planned Layer-1 hosts** (`AyCode.Services.SignalRs` and the planned `AyCode.Core.AspNetCore` formatter package — see [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-a8r5`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-a8r5-aspnet-core-mvc-formatters-acbinaryinputformatter--acbinaryoutputformatter)) **do not need** wire-format type-id — they get type info from their respective dispatch infrastructure (`IInvocationBinder.GetParameterTypes()`, HTTP routing → controller `[FromBody] T`). +5. **No third Layer-1 host is planned** — if a future consumer needs raw-IPC type dispatch, they build their own using one of the workarounds below. Speculative framework-side infrastructure for hypothetical consumers would violate the layer-purity doctrine. + +This is a permanent architectural decision, not a "TODO not yet done." Do not file new TODOs proposing wire-format type-id, registry, or session/handshake infrastructure inside `AyCode.Core`. + +**Workarounds (canonical patterns the consumer implements):** +- **Tag-based dispatch above AcBinary** (recommended — matches CLAUDE.md Rule #7): prefix each message with an `int` (or enum) tag the consumer reads first to choose `Deserialize`. The consumer encapsulates the tag-read + type-dispatch in its own deserialization wrapper. Reference example: `AyCode.Services.Server/SignalRs/AcSignalRDataSource` uses `SignalRCrudTags`. - **Polymorphic envelope type**: define a single `T = Envelope` containing a discriminator field + raw payload bytes; the consumer deserializes the envelope, switches on the discriminator, and re-deserializes the payload as the concrete type from the inner `byte[]`. Adds a small layer of indirection but works on top of fix-T `Deserialize`. - **One input per type-stream**: separate streams per message-class. Practical when the type-set is small and the transport can afford multiple connections. diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md index 824de35..dadaaeb 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md @@ -6,18 +6,16 @@ Streaming I/O layer for the binary serializer. Open / resolved issues this work ## Priority legend - **P0** blocker · **P1** important · **P2** nice-to-have · **P3** idea -## ACCORE-BIN-T-D6H4: Create `AsyncPipeReaderInput` class (Step 1 of ADR-0003) -**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 1 +## ACCORE-BIN-T-D6H4: ~~Create `AsyncPipeReaderInput` class~~ (Step 1 of ADR-0003) +**Status:** Closed (2026-05-02) · **Priority:** ~~P1~~ · **Type:** ~~Refactor~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 1 -Add new `sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs`. Self-contained sliding-window buffer (`byte[]` + `_writePos` + `_readPos` + `_completed` + `ManualResetEventSlim`) with reset-to-0 cycling preserved verbatim from today's `SegmentBufferReader`. Producer API: `Feed(ReadOnlySpan)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`. +~~Add new `sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs`. Self-contained sliding-window buffer (`byte[]` + `_writePos` + `_readPos` + `_completed` + `ManualResetEventSlim`) with reset-to-0 cycling preserved verbatim from today's `SegmentBufferReader`. Producer API: `Feed(ReadOnlySpan)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`.~~ -Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain unchanged in this step — they keep serving the SignalR `AcBinaryHubProtocol.TryParseChunkData` path. Migration to the new class is in Step 6 (`ACCORE-SBP-T-G7T2`). +### Resolution (2026-05-02) -**Naming rationale:** `AsyncPipeReaderInput` mirrors the existing send-side `AsyncPipeWriterOutput`. The `Async` prefix follows .NET BCL convention for type-level naming (`AsyncEnumerable`, `IAsyncDisposable`, `AsyncLocal`). +Class shipped at `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs` — sealed class implementing `IBinaryInputBase, IDisposable`. Producer API: `Feed(ReadOnlySpan)`, `Complete()`. Consumer API: `Initialize` / `TryAdvanceSegment` / `Release` / `MessageDone` (the latter added during the multi-message-reuse work, atomically resets buffer cursors at message boundaries). Framed-wire-format awareness via `multiMessage: true` ctor flag; raw-bytes pass-through via `multiMessage: false`. Class summary contains the "When chunked-streaming is the right fit" use-case catalogue (push-pattern only). Companion struct `AsyncPipeReaderInputAdapter` bridges to the deserializer's generic `TInput : struct, IBinaryInputBase` constraint with zero heap cost. -**Acceptance:** -- New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race). -- Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression). +Tests in `AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs` cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts including framing-state-machine, multi-chunk handoff, sliding-window cycle, and producer-consumer concurrency. Existing SignalR tests on the legacy `SegmentBufferReader` path continue to pass (no behavioral regression). ## ACCORE-BIN-T-M2K1: ~~Add `AsyncPipeReaderInput.DrainFromAsync` extension~~ (Step 2 of ADR-0003) **Status:** Closed (2026-05-02) — moved to test-only assembly · **Priority:** ~~P1~~ · **Type:** ~~Feature~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4` @@ -38,37 +36,14 @@ The drain-extension was originally added as a public framework helper, paired wi **Acceptance:** existing `AcBinarySerializerPipeParallelTests` and `AcBinarySerializerNamedPipeTests` continue to pass on the new test-only extension; benchmark (`Console.AsyncPipe` mode) continues to work. -## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003) -**Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1` +## ACCORE-BIN-T-V7C9: ~~Replace misleading parallel test with real parallel pipeline test~~ (Step 3 of ADR-0003) +**Status:** Closed (2026-05-02) · **Priority:** ~~P1~~ · **Type:** ~~Test~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1` -The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path. +~~The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path.~~ -**Acceptance:** -- Test passes consistently on Windows + Linux CI. -- Measured perf delta documented in test output / commit message. -- Test serves as regression guard for future receive-side changes (no silent perf-cliff regression goes undetected). +### Resolution (2026-05-02) -## ACCORE-BIN-T-A3T8: Add NamedPipe helpers — `SerializeToNamedPipeAsync` / `DeserializeFromNamedPipeAsync` (Step 4 of ADR-0003) -**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 4 · **Depends on:** `ACCORE-BIN-T-V7C9` - -Add static extension methods on `AcBinarySerializerOptions` for full NamedPipe IPC lifecycle (one-shot send / receive). New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerNamedPipeExtensions.cs`. Send: `NamedPipeServerStream` → `PipeWriter.Create(stream)` → `AsyncPipeWriterOutput`. Receive: `NamedPipeClientStream` → `PipeReader.Create(stream)` → `AsyncPipeReaderInput.DrainFromAsync`. - -Cross-platform: Windows + Linux (Unix-domain-socket via NamedPipe BCL API). WASM throws `PlatformNotSupportedException` per BCL contract. - -**Acceptance:** -- Cross-platform integration test: roundtrip a complex object graph through a NamedPipe; assert structural equality. -- WASM build does not link these helpers (or throws clear PNS at runtime if invoked). - -## ACCORE-BIN-T-B5Y6: Add FileStream helpers — `SerializeToFileStreamAsync` / `DeserializeFromFileStreamAsync` (Step 5 of ADR-0003) -**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 5 · **Depends on:** `ACCORE-BIN-T-A3T8` - -Add static extension methods on `AcBinarySerializerOptions` for streaming file I/O. New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerFileStreamExtensions.cs`. Send: `FileStream.Create(path)` → `PipeWriter.Create(fileStream)` → `AsyncPipeWriterOutput`. Receive: `FileStream.OpenRead(path)` → `PipeReader.Create(fileStream)` → `AsyncPipeReaderInput.DrainFromAsync`. - -**Critical streaming-doctrine invariant:** peak buffer memory bounded by `BufferWriterChunkSize × 2` (~8 KB at default), regardless of file size. **NOT file-size-aware** — do not pre-allocate to file size (would defeat streaming and break zerocopy / zeroalloc). - -**Acceptance:** -- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout. -- Full structural equality of round-tripped object. +`AcBinarySerializerPipeParallelTests.RealParallelPipeline_*` tests added: 3-task pipeline (serializer Task.Run → drain Task → deserializer Task.Run) running concurrently over an in-memory `Pipe`, exercised at both `BufferWriterChunkSize=256` (small payload, multi-chunk crossing) and `BufferWriterChunkSize=4096` + production-scale payload from `TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100)` (~3700 nested objects, hundreds of KB → many chunks → real backpressure-driven streaming). Uses the test-only `AsyncPipeReaderInputExtensions.DrainFromAsync` (per `M2K1` resolution). Asserts payload structural equality on round-trip; no perf-delta assertion encoded — the dedicated benchmark suite (`Console.AsyncPipe` mode) covers performance regression detection. Tests pass on Windows; Linux CI not yet wired but no platform-specific code paths exist. ## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput **Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported) @@ -137,18 +112,18 @@ Possible directions for a failover / reconnect scenario (none committed): Each has different thread-safety implications and wire-formatting reentrancy requirements; explore before any code. -## ACCORE-BIN-T-S5N2: Pattern catalogue in the public class summary -**Status:** Partially Resolved (2026-05-02) · **Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface) +## ACCORE-BIN-T-S5N2: ~~Pattern catalogue in the public class summary~~ +**Status:** Closed (2026-05-02) · **Priority:** ~~P3~~ · **Type:** ~~Documentation~~ · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface) -Original scope: extend the `AsyncPipeReaderInput` class summary with an explicit producer-consumer pattern catalogue (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with recommended API per pattern. +~~Original scope: extend the `AsyncPipeReaderInput` class summary with an explicit producer-consumer pattern catalogue (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with recommended API per pattern.~~ -### Resolution (2026-05-02): broader scope captured +### Resolution (2026-05-02) Both `AsyncPipeReaderInput` (reader-side) and `AsyncPipeWriterOutput` (writer-side) class summaries now contain a **"When chunked-streaming is the right fit"** catalogue covering the realistic transport scenarios — not just SignalR / NamedPipe-only. The list explicitly enumerates: network transports (TCP/UDP/WebSocket/SSE/HTTP/2), multi-connection servers (SignalR/gRPC/proprietary RPC), message brokers (Kafka/Redis Streams/Service Bus), file streaming (`FileStream`-backed `PipeReader`/`PipeWriter`), in-memory cross-thread `Pipe`, and custom transport adapters. The complementary "When raw `byte[]` is the right fit" section calls out the loopback-IPC / sub-LOH-message / no-GC-pressure case where chunked overhead is visible without commensurate benefit. The single-class API surface reflects **push pattern only** (the dominant pattern across all listed use cases); the more granular threading-model patterns from the original S5N2 scope (single-thread vs multi-thread feed+deserialize) are now covered by the **explicit "consumer's reader-task drives the transport" framing** in the class summary — applications choose their threading model, the input class is agnostic. -**Open follow-up**: a dedicated `BINARY/CHOOSING_API.md` (or a section in `BINARY/README.md`) cross-cutting the `byte[]` vs `IBufferWriter` vs `AsyncPipeReaderInput`/`AsyncPipeWriterOutput` decision could lift this from the class XML-doc to navigable doc-folder content. Tracked separately if/when a NuGet consumer hits the discoverability gap. +The dedicated `BINARY/CHOOSING_API.md` follow-up (cross-cutting `byte[]` vs `IBufferWriter` vs `AsyncPipeReaderInput`/`AsyncPipeWriterOutput` decision tree as navigable doc-folder content) is **dropped** — the class XML doc-comment surface is sufficient for the IDE-driven discovery flow that NuGet consumers actually use. Re-open as a new TODO if a NuGet consumer reports a discoverability gap. ## ACCORE-BIN-T-G3W8: Transport-buffer alignment best-practice doc section **Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2-chunk-on-wire-size--chunksize--headersize-caused-page-fragmentation) @@ -209,3 +184,44 @@ Per side: - Concurrent multi-producer test: N producer threads simultaneously serialise into N stream-id-distinct writers; receiver demultiplexes and verifies each stream's graph independently. No interleaving corruption. - Microbenchmark: single-stream sequential RT regresses by ≤ ~5 ns/op vs current (a few branch-predicted `if` per chunk; not strategy-generic-grade zero-overhead but production-invisible). - Wire-format documentation updated in [`BINARY_FORMAT.md`](BINARY_FORMAT.md) with the `[0xC8]` mux-data activation and the chosen end-marker shape. + +## ACCORE-BIN-T-A8R5: ASP.NET Core MVC formatters (`AcBinaryInputFormatter` / `AcBinaryOutputFormatter`) +**Priority:** P3 · **Type:** Feature (separate package) · **Related:** none yet + +A separate NuGet package `AyCode.Core.AspNetCore` providing zero-friction integration of AcBinary serialization into ASP.NET Core MVC controllers. Single-line activation: +```csharp +builder.Services.AddControllers().AddAcBinaryFormatters(); +``` + +Once registered, controllers using `[FromBody] T model` and `IActionResult` automatically use AcBinary when the client sends `Accept: application/x-acbinary` (content-negotiation handles the JSON-vs-AcBinary fallback transparently). + +**Why a separate package**: `AyCode.Core` is layer 0 (no framework dependencies). ASP.NET Core integration would belong in a layer-1 consumer-aware package — typical pattern for serialization libraries that want to avoid forcing ASP.NET Core dependency on every consumer. Layer-correctness preserved. + +**Why this specific feature matters**: turns the chunked-streaming infrastructure from a "library primitive that you have to wire up yourself" into a "production-grade REST API content type". The Pipe-streaming path becomes the natural production setup — `Response.BodyWriter` (PipeWriter) and `Request.BodyReader` (PipeReader) are native ASP.NET Core types, so the formatter can drive them directly with `SerializeChunkedFramed` / `AsyncPipeReaderInput.Feed` patterns. Per-connection peak memory bounded by chunk-size; LOH allocation pressure (≥ 85 KB messages) avoided — the chunked-streaming value-proposition surfaces automatically. + +**Possible directions** (none fixed; pre-design): + +- **InputFormatter shape**: `AcBinaryInputFormatter : InputFormatter` with `SupportedMediaTypes.Add("application/x-acbinary")`. The `ReadRequestBodyAsync` reads from `context.HttpContext.Request.BodyReader` (PipeReader) — either drains to byte[] for simple cases (size-bounded by `[Request].MaxAllowedSize`), or uses the `AsyncPipeReaderInput` + drain-task pattern for low-memory streaming on huge payloads. Decide which is the default; expose the streaming variant as opt-in. + +- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning for higher throughput at the cost of owned-buffer fallback risk. + +- **Wire-format choice**: raw chunked stream (`SerializeChunked`) is the natural fit for HTTP single-request-single-response. The multi-message framed variant (`SerializeChunkedFramed`) is over-engineered for REST — there's no concept of "next message on this stream" within a single HTTP request. Document this choice clearly. + +- **Content-Type negotiation**: `application/x-acbinary` (vendor-specific MIME) vs `application/vnd.aycode.binary` (RFC-style vendor). Pick before publishing — once consumers integrate the MIME type into their `Accept` headers, changing it is a breaking change. + +- **Options propagation**: the formatter ctor accepts `AcBinarySerializerOptions?` (nullable, defaults to `Default`). DI registration via `IOptions` is also supported. `AddAcBinaryFormatters(options => { options.UseGeneratedCode = true; ... })` builder-pattern overload for declarative configuration. + +- **Action result wrapping**: optional `AcBinaryResult : ActionResult` for explicit Content-Type opt-out (when the action wants to force AcBinary even if the client didn't request it). Decide whether to ship. + +- **`IAsyncEnumerable` server-streaming**: gRPC-like server-streaming over a single HTTP response. The OutputFormatter can detect `IAsyncEnumerable` and stream items as multi-message chunks (`SerializeChunkedFramed` per item). Big feature, ship in v1 vs v2 — decide. + +**Acceptance:** +- New package `AyCode.Core.AspNetCore` published independently with its own README and integration tests (TestServer-based round-trip). +- `[FromBody]` deserialization works on a controller for any `[AcBinarySerializable]` model. +- Action returning a model serialises via AcBinary when `Accept: application/x-acbinary`. +- Falls back transparently to JSON (default ASP.NET formatter) when the client doesn't request AcBinary — coexistence with default formatters. +- README documents content-negotiation, MIME type, wire-format choice, options configuration, performance trade-offs vs default JSON. +- Memory-pressure benchmark (multi-connection load test): demonstrates the bounded-peak-memory property over JSON baseline. + +**Sequencing**: implement AFTER `AyCode.Core` reaches its first stable NuGet release (1.0). The formatter package versioning follows `AyCode.Core` — breaking changes there propagate here. Don't ship the formatter package as part of v1.0 — let `AyCode.Core` stabilize first, then add this as a 1.1 / 2.0 optional extension. + diff --git a/AyCode.Core/docs/BINARY/BINARY_TODO.md b/AyCode.Core/docs/BINARY/BINARY_TODO.md index 274be49..f98d630 100644 --- a/AyCode.Core/docs/BINARY/BINARY_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_TODO.md @@ -138,7 +138,7 @@ Pick one before touching code. Option 2 is the most correct but adds API surface - `AcBinarySerializer.cs` × 3 sites (`ArrayBinaryOutput` ctor, `BufferWriterBinaryOutput` ctor, `AsyncPipeWriterOutput` ctor) - `AcBinaryDeserializer.cs` × 1 site (receive-side initial capacity derivation) - `AsyncPipeReaderInput.cs`, `SegmentBufferReader.cs`, `SegmentBufferReaderInput.cs` — XML doc cross-refs -- `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry, plus the streaming-doctrine invariant in `ACCORE-BIN-T-B5Y6`), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters) +- `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters) - Consumer-side: `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` ctor mutates `_options.BufferWriterChunkSize = options.BufferSize;` — see `BINARY_ISSUES.md#accore-bin-i-...` (struct-mutation context). Coordinate the rename with the struct-mutation fix to avoid two cross-cutting churn waves on the same property. **Acceptance:** @@ -429,3 +429,60 @@ The latent thread-safety problem documented in `ACCORE-BIN-I-L8N5` — mutable ` - `BINARY_ISSUES.md#accore-bin-i-l8n5` Status updated to `Closed (YYYY-MM-DD)` with a `### Resolution` sub-section pointing to this TODO + the implementing commit. - Doc-string on `AcBinarySerializerOptions` documents the freeze-on-first-use contract; `BINARY_FEATURES.md` or `BINARY_OPTIONS.md` cross-references the BCL-precedent (`JsonSerializerOptions.MakeReadOnly`). +## ACCORE-BIN-T-F8N3: Switch source-generator type-name hashing from simple-name to fully-qualified-name +**Priority:** P3 · **Type:** Refactor · **Related:** [`ACCORE-BIN-T-I3P8`](#accore-bin-t-i3p8-acbinarytypeid-attribute--explicit-type-id-override) (override mechanism for residual collisions) + +The source generator's `ComputeFnvHash(typeSymbol.Name)` uses the **simple name only** (e.g. `"User"`, not `"MyApp.A.User"`). Cross-namespace types with the same simple name silently collide on `s_typeNameHash`. The hash is currently only consumed by the `WireMode=Metadata` inline metadata-write path (cross-version property compat) — the framework explicitly does NOT add wire-format type-id (per CLAUDE.md Rule #7: type-dispatch is consumer responsibility, see [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-t6v2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-t6v2-single-fixed-type-per-long-lived-stream)). Within `UseMetadata`, the simple-name collision can still cause silent property-set mismatches between two types with the same short name in different namespaces — this TODO fixes that. + +**Change scope** (`AcBinarySourceGenerator.cs`) — 4 call sites: `ComputeFnvHash(typeSymbol.Name)` → `ComputeFnvHash(typeSymbol.ToDisplayString())`: +- Self type-name hash (~line 358) +- Child type-name hash (~line 157) +- Element type-name hash (~line 254) +- Dict-value type-name hash (~line 311) + +No runtime code changes; output regenerates with new constants on next build. + +**Breaking change scope:** any saved binary stream that uses `WireMode=Metadata` and was produced by an older version embeds the old simple-name hash; consumers reading those streams with the new hash compute would mismatch and throw. Pre-1.0: acceptable. Post-1.0 would require a `WireMode=Metadata` format-version bump. + +**Acceptance:** +- All `*_GeneratedWriter.g.cs` files regenerate with FQN-based `s_typeNameHash` values. +- Existing tests pass (auto-regen propagates; no manual hash literals in tests). +- Wire format identical for `WireMode=Compact` (no metadata embedded). +- `UseMetadata=true` paths produce different hashes — explicitly tested via round-trip. + +## ACCORE-BIN-T-I3P8: `[AcBinaryTypeId(...)]` attribute — explicit type-id override +**Priority:** P3 · **Type:** Feature · **Related:** [`ACCORE-BIN-T-F8N3`](#accore-bin-t-f8n3-switch-source-generator-type-name-hashing-from-simple-name-to-fully-qualified-name) (FQN base hash being overridden) + +Once `ACCORE-BIN-T-F8N3` reduces collision frequency by switching to FQN, residual FQN-hash collisions are still possible (32-bit hash space, birthday paradox). Currently the only consumer of `s_typeNameHash` is the `WireMode=Metadata` inline metadata-write path — a residual collision there causes a silent property-set mismatch. + +`[AcBinaryTypeId(0x12345)]` attribute on a class: +- Source generator emits `s_typeNameHash = 0x12345` instead of computing FNV. +- Two types with the same `[AcBinaryTypeId(...)]` value → compile-time / first-use error. + +**Useful for:** +- Resolving rare FQN-hash collisions deterministically (within `WireMode=Metadata`). +- Pinning a stable type-id across class renames (wire-compat across versions in `Metadata` mode). +- Future-proofing: if a Layer 1 consumer (hypothetically) builds a type-dispatch above AcBinary using `s_typeNameHash`, the same override mechanism applies. + +**Acceptance:** +- New attribute class shipped alongside `[AcBinarySerializable]`. +- Generator honours the override (emits explicit constant instead of FNV result). +- Tests: rename a class with `[AcBinaryTypeId]` → `s_typeNameHash` unchanged. + +## ACCORE-BIN-T-X2M5: Evaluate xxHash3 vs FNV-1a for type-name hashes +**Priority:** P3 · **Type:** Investigation · **Related:** [`ACCORE-BIN-T-F8N3`](#accore-bin-t-f8n3-switch-source-generator-type-name-hashing-from-simple-name-to-fully-qualified-name) + +FNV-1a is currently used for both `s_typeNameHash` and `s_propertyHashes`. For compile-time hashing, performance is irrelevant. For collision resistance: +- FNV-1a 32-bit: ~50% collision at ~77K types (birthday paradox). Adequate for small/medium projects, marginal for large ones with many auto-generated types. +- xxHash3 32-bit: comparable mathematical properties to FNV-1a (both non-cryptographic). +- xxHash3 64-bit: dramatically better collision resistance (~50% at ~5B entries), at the cost of 8 wire bytes instead of 4. + +**Trigger:** real collisions observed (1000+ types per assembly + cross-assembly aggregation), or community feedback indicating collision pain. + +**Investigation questions** (no code change without a triggering pain signal): +1. Switch to xxHash3 32-bit (incremental improvement) — but doubles the change scope (touch property hashes too if uniformity desired). +2. Switch to xxHash3 64-bit (8 wire bytes instead of 4) — meaningful collision resistance, modest wire cost. +3. Stay on FNV-1a + force `[AcBinaryTypeId]` for collisions — minimal change, devops burden. + +Investigation only — defer until pain signal arrives. + diff --git a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md index 53bdc11..5c92750 100644 --- a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md @@ -113,16 +113,19 @@ Each chunk has a 3-byte header reserved via **header reservation** (skip 3 bytes ### Backpressure Modes -Constructor parameter `waitForFlush` (default `true`): +Constructor parameter `flushPolicy` of type `FlushPolicy` (default `FlushPolicy.DoubleBuffered`): -- **`waitForFlush=true`**: `Grow()` blocks if previous `FlushAsync` is still in-flight. Max ~2 chunks in memory. -- **`waitForFlush=false`**: `Grow()` never blocks. Data accumulates in PipeWriter's internal buffer and is sent with the next completed flush. Maximum serialization throughput. +- **`FlushPolicy.PerChunk`**: `Grow()` commits → flushes → awaits → acquires next chunk. Strictly bounded peak memory (~chunk_size × 1). No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. Auto-applied on Stream-backed PipeWriter regardless of policy. Recommended for memory-sensitive scenarios where payload size is unpredictable. +- **`FlushPolicy.DoubleBuffered`** (default): `Grow()` is fire-and-forget for the previous flush; only blocks at the NEXT chunk's `Grow` if the previous flush hasn't completed. Peak memory ~chunk_size × 2 (current + previous overlapping). Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. The recommended balanced default for typical streaming. +- **`FlushPolicy.Coalesced`**: `Grow()` does not wait per-chunk. The underlying Pipe coalesces flushes adaptively up to its `PauseWriterThreshold` (~64 KB committed bytes). Highest serialization throughput on bounded payloads; memory grows to `PauseWriterThreshold` under slow-consumer conditions. -In both modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. +In all three modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. + +> **Migration note**: `FlushPolicy` replaces the historical `bool waitForFlush` parameter. Mapping: old `true` → `FlushPolicy.DoubleBuffered`, old `false` → `FlushPolicy.Coalesced`. The new `FlushPolicy.PerChunk` value is a NEW capability that previously was only auto-applied on Stream-backed PipeWriter; it can now be explicitly chosen on Pipe-based writers for strictly bounded peak memory. ### Two parallel-flush regimes (auto-detected) -Runtime check `pipeWriter.GetType()` splits flush behavior into two regimes — auto-detected at ctor via `_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter)`. No caller intervention. Orthogonal to `waitForFlush` and to the wire-format mode choice (`Bytes` / `Segment` / `AsyncSegment`). +Runtime check `pipeWriter.GetType()` splits flush behavior into two regimes — auto-detected at ctor via `_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter)`. No caller intervention. Orthogonal to `FlushPolicy` and to the wire-format mode choice (`Bytes` / `Segment` / `AsyncSegment`). **True parallel** — Pipe-based / parallel-capable PipeWriters: `new Pipe().Writer`, Kestrel transport output, custom parallel-capable impls. `Grow()` uses `FlushAsync().Forget()` pattern: serializer continues with the next chunk while the network async-flushes the previous one. Round-trip wall-clock = `max(serialize, flush) × N_chunks` — flush hides behind serialize-time. Production-stable on SignalR / Kestrel; "minimally slower than raw byte[]" empirically. diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 900e5b6..5753236 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -77,9 +77,9 @@ public class AcBinaryHubProtocol : IHubProtocol protected readonly ILogger? _logger; /// - /// AsyncSegment per-chunk flush synchronization — see . + /// AsyncSegment per-chunk flush synchronization — see . /// - protected readonly bool _waitForFlush; + protected readonly FlushPolicy _flushPolicy; /// /// Per-flush wait limit — see . @@ -142,7 +142,7 @@ public class AcBinaryHubProtocol : IHubProtocol _protocolMode = options.ProtocolMode; _logger = options.Logger; - _waitForFlush = options.WaitForFlush; + _flushPolicy = options.FlushPolicy; _flushTimeout = options.FlushTimeout; Name = options.Name; @@ -150,9 +150,9 @@ public class AcBinaryHubProtocol : IHubProtocol _chunkStates = new ConditionalWeakTable(); _logger?.LogInformation( - "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", + "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} flushPolicy={FlushPolicy} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, - _waitForFlush, _flushTimeout.TotalMilliseconds, + _flushPolicy, _flushTimeout.TotalMilliseconds, _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); } @@ -509,7 +509,7 @@ public class AcBinaryHubProtocol : IHubProtocol // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- if (streamedArg != null) { - dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _waitForFlush, _flushTimeout); + dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout); _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); } diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs index c20f428..0bc4fc1 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -27,17 +27,20 @@ public sealed class AcBinaryHubProtocolOptions /// /// Per-chunk flush synchronization in mode. /// - /// true (default): every Grow waits for the previous FlushAsync to complete - /// before starting the next chunk. Guarantees end-to-end zero-copy (no owned-buffer fallback) - /// and maximum pipeline parallelism. Best for high-throughput servers with fast consumers. - /// false: fire-and-forget flush per chunk; blocks only when committed bytes exceed - /// the memory threshold (~60 KB). Itself an adaptive backpressure mode — a fast consumer - /// never triggers a wait, a slow consumer naturally throttles through buffer pressure. - /// Safer for mixed consumer speeds and memory-sensitive environments. + /// : commit → flush → await each chunk before + /// acquiring the next. Strictly bounded peak memory (~chunk × 1). No producer/flush + /// parallelism. Best for memory-sensitive servers and unpredictable payload sizes. + /// : fire-and-forget previous flush; next chunk + /// waits only if previous flush hasn't completed. Peak memory ~chunk × 2 with maximum + /// producer/flush parallelism — the recommended balanced choice for typical streaming. + /// (default for SignalR): no per-chunk wait; the + /// underlying Pipe coalesces flushes adaptively up to its PauseWriterThreshold (~64 KB). + /// Highest throughput on bounded payloads — fast consumer never triggers a wait, slow + /// consumer naturally throttles through buffer pressure. /// /// Ignored for Bytes and Segment modes. /// - public bool WaitForFlush { get; set; } = false; + public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.Coalesced; /// /// Maximum wait for a single synchronous FlushAsync before throwing @@ -106,7 +109,7 @@ public sealed class AcBinaryHubProtocolOptions SerializerOptions = SerializerOptions, ProtocolMode = ProtocolMode, BufferSize = BufferSize, - WaitForFlush = WaitForFlush, + FlushPolicy = FlushPolicy, FlushTimeout = FlushTimeout, Name = Name, Logger = Logger diff --git a/AyCode.Services/docs/SIGNALR/SIGNALR_ISSUES.md b/AyCode.Services/docs/SIGNALR/SIGNALR_ISSUES.md index 6deaa5d..f0b3eb4 100644 --- a/AyCode.Services/docs/SIGNALR/SIGNALR_ISSUES.md +++ b/AyCode.Services/docs/SIGNALR/SIGNALR_ISSUES.md @@ -79,7 +79,7 @@ Current `PluginNopStartup.ConfigureServices`: ``` What's missing: -- No `services.Configure(configuration.GetSection("AyCode:SignalR:Protocol"))` → `ProtocolMode`, `BufferSize`, `WaitForFlush`, `FlushTimeout` are all hardcoded / default. +- No `services.Configure(configuration.GetSection("AyCode:SignalR:Protocol"))` → `ProtocolMode`, `BufferSize`, `FlushPolicy`, `FlushTimeout` are all hardcoded / default. - The `appsettings.json` has no `AyCode:SignalR` (or equivalent) section at all — so per-deploy tuning (e.g. increasing `FlushTimeout` for a satellite link, switching `ProtocolMode` for diagnostics) requires a code change + redeploy. - Manual `new Logger(...)` sidesteps the DI `ILogger` auto-resolution that `BuildProtocol` provides → creates a parallel logger instance (see `../../../AyCode.Core/docs/LOGGING/LOGGING_ISSUES.md#accore-log-i-m4c9`). diff --git a/AyCode.Services/docs/SIGNALR/SIGNALR_TODO.md b/AyCode.Services/docs/SIGNALR/SIGNALR_TODO.md index c436ab7..2d46f7b 100644 --- a/AyCode.Services/docs/SIGNALR/SIGNALR_TODO.md +++ b/AyCode.Services/docs/SIGNALR/SIGNALR_TODO.md @@ -56,7 +56,7 @@ services.AddSignalR(hubOptions => { /* unchanged */ }) "Protocol": { "ProtocolMode": "AsyncSegment", "BufferSize": 4096, - "WaitForFlush": true, + "FlushPolicy": "PerChunk", "FlushTimeout": "00:00:10" } } diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md index 2fbfaf3..4fb8899 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/README.md @@ -159,19 +159,20 @@ Hub protocol settings via **`AcBinaryHubProtocolOptions`** (mutable class). Pass | `SerializerOptions` | `AcBinarySerializerOptions.Default` | Binary serializer options (also usable standalone via `ToBinary`/`BinaryTo`). | | `ProtocolMode` | `Bytes` | Wire format and pipeline strategy — see **BinaryProtocolMode** below. | | `BufferSize` | 4096 | Per-chunk size. 4 KB aligns with Kestrel's slab. Max 65535 (UINT16). | -| `WaitForFlush` | `true` | AsyncSegment flush strategy — see trade-off below. | +| `FlushPolicy` | `Coalesced` | AsyncSegment flush strategy — see trade-off below. | | `FlushTimeout` | 10 s | Per-flush wait limit. `Timeout.InfiniteTimeSpan` = disabled. | | `Name` | `"acbinary"` | SignalR handshake protocol name. Client and server must match. | | `Logger` | `null` | Optional `ILogger`; injected from DI when registered. | Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCode=true` (hybrid source-gen + reflection), `UseStringInterning=All`, `InitialBufferCapacity=16384`. -### `WaitForFlush` (AsyncSegment-only) +### `FlushPolicy` (AsyncSegment-only) -| Value | Pro | Con | -|-------|-----|-----| -| **`true`** (default) | Max pipeline parallelism + guaranteed end-to-end zero-copy on send. | Slow consumer propagates back as server-thread blocking (bounded by `FlushTimeout`). | -| **`false`** | Adaptive — fire-and-forget per chunk, blocks only when ~60 KB memory threshold is hit. | Under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | +| Value | Peak memory | Pro | Con | +|-------|------------|-----|-----| +| **`FlushPolicy.PerChunk`** | ~chunk × 1 | Strictly bounded peak memory regardless of consumer speed; guaranteed end-to-end zero-copy. | No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk. | +| **`FlushPolicy.DoubleBuffered`** | ~chunk × 2 | Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. Recommended for balanced streaming. | Slow consumer propagates back as server-thread blocking at next `Grow` (bounded by `FlushTimeout`). | +| **`FlushPolicy.Coalesced`** (default) | up to ~64 KB | Highest throughput on bounded payloads — pipe coalesces flushes up to `PauseWriterThreshold`. Never blocks on fast consumers. | Peak memory grows to `PauseWriterThreshold` under slow consumer; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | ### `FlushTimeout` rationale (10 s default) diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md index 0c59995..e380611 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md @@ -46,7 +46,7 @@ Alternative to wire-detection: use SignalR handshake message's `extensions` JSON Zero first-message overhead, fully explicit. Both sides advertise their send-modes; pick intersection. Specification to be drafted; compatibility with non-AC clients (pure JSON etc.) must remain. ## ACCORE-SBP-T-G7T2: Migrate `AcBinaryHubProtocol.TryParseChunkData` to `AsyncPipeReaderInput`; delete `SegmentBufferReader` + `SegmentBufferReaderInput` (Step 6 of ADR-0003) -**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 · **Depends on:** `ACCORE-BIN-T-B5Y6` (and all earlier BIN steps) +**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 6 · **Depends on:** `ACCORE-BIN-T-V7C9` (last completed step in the BIN streaming-framework chain — Steps 4 & 5 NamedPipe / FileStream helpers were dropped; framework stays transport-agnostic) Switch `AcBinaryHubProtocol.TryParseChunkData` from `SegmentBufferReader.Write(span)` to `AsyncPipeReaderInput.Feed(span)`. Update `AsyncChunkState` field type from `SegmentBufferReader Buffer` to `AsyncPipeReaderInput Input`. Lazy `Task.Run` deser-task start (after first chunk), `CHUNK_END` lifecycle (`Complete()` + `Dispose()` + `_chunkStates.Remove`), and the WASM synchronous-deser path all preserved. Wire format unchanged.