diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index 185851e..a879f29 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -1,6 +1,7 @@ using AyCode.Core.Compression; using AyCode.Core.Serializers.Attributes; using AyCode.Core.Serializers.Binaries; +using AyCode.Core.Tests.Serialization; // DrainFromAsync extension (test-only, used by benchmark) using AyCode.Core.Tests.TestModels; using MemoryPack; using MessagePack; @@ -42,8 +43,8 @@ public static class Program private static int TestIterations = 1; private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration #else - private static int WarmupIterations = 5000; //5000 - private static int TestIterations = 1000; //1000 + private static int WarmupIterations = 100; //5000 + private static int TestIterations = 10; //1000 private static int BenchmarkSamples = 3; #endif @@ -61,6 +62,15 @@ public static class Program private const string IoString = "String"; private const string IoNamedPipe = "NamedPipe"; private const string IoNamedPipeRaw = "NamedPipe"; + private const string IoInMemoryPipe = "Pipe(in-mem)"; + private const string IoInMemoryRaw = "Bytes(in-mem)"; + + // Single source of truth for the chunk size used by ALL pipe-related benchmarks (NamedPipe PipeChunk, + // NamedPipe PipeRaw, in-memory Pipe, in-memory RawMem) AND the NamedPipe server's inBufferSize/outBufferSize. + // Same value across both layers ensures apples-to-apples comparison: chunked-streaming chunk-on-wire size + // matches the kernel pipe-buffer slot exactly. Tweak HERE when experimenting; do NOT scatter chunkSize + // overrides across individual benchmark rows. + private const int PipeChunkSize = 4096; // Dispatch mode identifiers — describes how property access / type dispatch happens for a given run. // SGen = compile-time source generator path (Unsafe.As direct fields, slot-array wrapper lookup). @@ -152,68 +162,97 @@ public static class Program // Done early so user is told immediately, not after warmup. ValidateMemoryPackSetup(); - // Determine layer (which test data to run), opMode (ser/des/all), and serializerMode (standard/asyncpipe). - // CLI args take precedence; if no args, show interactive menu. - // serializerMode: "standard" = all serializers EXCEPT AsyncPipe; "asyncpipe" = ONLY the AsyncPipe streaming benchmark. - // The two are mutually exclusive — AsyncPipe never runs alongside the standard set, so its long-lived pipe - // setup / kernel-buffer overhead does not skew the steady-state Byte[] / IBufferWriter measurements. - string layer; - var opMode = "all"; - var serializerMode = "standard"; + // CLI mode (args provided): run once, parse args, exit. Backward-compatible behaviour. + if (args.Length > 0) + { + if (!TryParseCliArgs(args, out var layer, out var opMode, out var serializerMode)) + return; // profiler mode (already ran) or invalid args + RunBenchmark(layer, opMode, serializerMode); + return; + } - if (args.Length == 0) + // Interactive mode (no args): loop the menu so the user doesn't have to restart between runs. + // Q exits the menu (and the application). + while (true) { var selection = ShowInteractiveMenu(); if (selection == null) return; // user pressed Q - layer = selection.Value.layer; - serializerMode = selection.Value.serializerMode; + + RunBenchmark(selection.Value.layer, "all", selection.Value.serializerMode); + + System.Console.WriteLine(); + System.Console.WriteLine("─────────────────────────────────────────────────────────────────────"); + System.Console.WriteLine("Returning to menu — press any key to continue, or Q to quit..."); + var key = System.Console.ReadKey(intercept: true); + if (key.Key == ConsoleKey.Q) return; + System.Console.WriteLine(); + } + } + + /// + /// Parses CLI arguments into (layer, opMode, serializerMode). Returns false if the args + /// indicate a special mode that has already been handled (e.g. profiler) or are invalid; + /// the caller should then exit without running the standard benchmark. + /// + private static bool TryParseCliArgs(string[] args, out string layer, out string opMode, out string serializerMode) + { + layer = "all"; + opMode = "all"; + serializerMode = "standard"; + + var arg = args[0].ToLower(); + + // Profiler mode: warmup only, then exit (for memory profiler analysis) + if (arg == "profiler") + { + RunProfilerMode(); + return false; + } + + // Quick mode: short warmup, few iterations, small sample count + if (arg == "quick") + { + WarmupIterations = 5; + TestIterations = 100; + BenchmarkSamples = 3; + layer = "all"; + } + else if (arg is "core" or "comprehensive" or "edge" or "all") + { + layer = arg; + } + else if (arg is "asyncpipe" or "pipe") + { + // AsyncPipe-only mode: streaming I/O isolation across all test data. + layer = "all"; + serializerMode = "asyncpipe"; + } + else if (arg is "ser" or "serialize") + { + opMode = "serialize"; + layer = "all"; + } + else if (arg is "des" or "deserialize") + { + opMode = "deserialize"; + layer = "all"; } else { - var arg = args[0].ToLower(); - - // Profiler mode: warmup only, then exit (for memory profiler analysis) - if (arg == "profiler") - { - RunProfilerMode(); - return; - } - - // Quick mode: short warmup, few iterations, small sample count - if (arg == "quick") - { - WarmupIterations = 5; - TestIterations = 100; - BenchmarkSamples = 3; - layer = "all"; - } - else if (arg is "core" or "comprehensive" or "edge" or "all") - { - layer = arg; - } - else if (arg is "asyncpipe" or "pipe") - { - // AsyncPipe-only mode: streaming I/O isolation across all test data. - layer = "all"; - serializerMode = "asyncpipe"; - } - else if (arg is "ser" or "serialize") - { - opMode = "serialize"; - layer = "all"; - } - else if (arg is "des" or "deserialize") - { - opMode = "deserialize"; - layer = "all"; - } - else - { - // Backwards compat: unknown arg → treat as layer keyword - layer = arg; - } + // Backwards compat: unknown arg → treat as layer keyword + layer = arg; } + return true; + } + + /// + /// Runs the benchmark suite end-to-end for the given configuration: pre-warmup → per-cell warmup + /// + measurement → grouped results print → save to disk. Used by both the CLI and interactive + /// menu paths; the interactive loop calls this repeatedly without restarting the process. + /// + private static void RunBenchmark(string layer, string opMode, string serializerMode) + { System.Console.WriteLine("╔══════════════════════════════════════════════════════════════════════╗"); System.Console.WriteLine("║ COMPREHENSIVE SERIALIZER BENCHMARK SUITE ║"); System.Console.WriteLine("╚══════════════════════════════════════════════════════════════════════╝"); @@ -449,7 +488,7 @@ public static class Program // fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level // wire chunk AND kernel transfer unit; change ONLY this line when tuning. var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode; - binaryFastModePipeChunkOnly.BufferWriterChunkSize = 4096; //AsyncPipeWriterOutput.MaxChunkSize; + binaryFastModePipeChunkOnly.BufferWriterChunkSize = PipeChunkSize; return new List { @@ -463,6 +502,19 @@ public static class Program // the chunked-row above this isolates AsyncPipe-framework-overhead (Δ vs raw) from // kernel-transport-overhead (raw vs in-process Byte[]). new AcBinaryNamedPipeRawByteArrayBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeRaw)"), + // Chunked-framed AsyncPipe over an IN-MEMORY System.IO.Pipelines.Pipe (NO NamedPipe, NO kernel). + // Same chunked-streaming code path (SerializeChunkedFramed → AsyncPipeReaderInput) but with the + // kernel-pipe replaced by a managed-only Pipe. Eliminates per-chunk syscall overhead (~30 µs/chunk + // on NamedPipe → ~1-2 µs/chunk on in-memory Pipe). Side-by-side with the NamedPipe row above this + // isolates pure CPU cost of the chunked-streaming framework (vs kernel-pipe transport cost) — the + // in-memory Pipe row should be much closer to the raw-byte[] row, validating that NamedPipe loopback + // is the worst-case benchmark scenario for chunked-streaming and not representative of real network + // / file / cross-thread Pipe scenarios. + new AcBinaryInMemoryPipeBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeChunk)"), + // Raw byte[] over IN-MEMORY direct cross-thread handoff (no transport at all). Apples-to-apples + // baseline for the in-memory chunked row above: same in-memory transport (zero kernel), but raw + // byte[] vs chunked-streaming wire format. Completes the 2x2 matrix [chunked,raw] × [kernel,memory]. + new AcBinaryInMemoryRawByteArrayBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeRaw)"), }; } @@ -484,7 +536,13 @@ public static class Program // allocates a fresh ABW. Independent of the AsyncPipe profile (different mechanism: alloc overhead // vs syscall count). var binaryFastModeBufWrChunk = AcBinarySerializerOptions.FastMode; - binaryFastModeBufWrChunk.BufferWriterChunkSize = 4096; + binaryFastModeBufWrChunk.BufferWriterChunkSize = PipeChunkSize; + + // In-memory Pipe variant — same 4 KB chunkSize as the AsyncPipe mode, no kernel-pipe alignment + // concern (managed slabs are not page-aligned anyway). Drives SerializeChunkedFramed via the in-memory + // System.IO.Pipelines.Pipe (zero-copy slab handoff between producer and drain task). + var binaryFastModePipeChunkInMem = AcBinarySerializerOptions.FastMode; + binaryFastModePipeChunkInMem.BufferWriterChunkSize = PipeChunkSize; var defaultOptions = AcBinarySerializerOptions.Default; defaultOptions.UseStringInterning = StringInterningMode.None; @@ -517,8 +575,20 @@ public static class Program // allocation. Optimum for this scenario. new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModeBufWrChunk, "FastMode (4KB)"), - // AsyncPipe streaming (AcBinaryNamedPipeBenchmark) is intentionally OMITTED here — run it via - // the dedicated AsyncPipe menu / CLI mode for isolated streaming-I/O measurements. + // AcBinary chunked-streaming over an IN-MEMORY Pipe (no kernel transport). Side-by-side with the + // Byte[] / IBufferWriter rows above this shows the chunked-streaming framework's pure CPU cost + // (no NamedPipe loopback noise) vs the simpler in-process serialize-then-deserialize patterns. + // The IO column shows "Pipe(in-mem)" — distinct from the NamedPipe AsyncPipe rows in [P] mode. + new AcBinaryInMemoryPipeBenchmark(testData.Order, binaryFastModePipeChunkInMem, "FastMode (PipeChunk)"), + + // Raw byte[] over IN-MEMORY direct cross-thread handoff (no transport, no kernel, no Pipe). Apples-to- + // apples baseline for the in-memory chunked row above: same in-memory pattern, but raw byte[] vs + // chunked-streaming wire format. The IO column shows "Bytes(in-mem)". + new AcBinaryInMemoryRawByteArrayBenchmark(testData.Order, binaryFastModePipeChunkInMem, "FastMode (PipeRaw)"), + + // AsyncPipe streaming over kernel NamedPipe (AcBinaryNamedPipeBenchmark) is intentionally OMITTED + // here — run it via the dedicated AsyncPipe menu [P] / CLI mode for isolated kernel-transport + // measurements. // ============================================================ // MemoryPack — three I/O modes for apples-to-apples comparison @@ -724,37 +794,80 @@ public static class Program /// /// Interactive menu shown when no CLI args. Returns the layer keyword (core/comprehensive/edge/all) or null on Quit. + /// Loops on settings-changes ([S]) — user is returned to this menu after modifying iteration counts. /// private static (string layer, string serializerMode)? ShowInteractiveMenu() { - System.Console.WriteLine(); - System.Console.WriteLine("╔══════════════════════════════════════════════════════════╗"); - System.Console.WriteLine("║ AcBinary Benchmark Suite ║"); - System.Console.WriteLine("╚══════════════════════════════════════════════════════════╝"); - System.Console.WriteLine(); - System.Console.WriteLine("Select benchmark layer:"); - System.Console.WriteLine(); - System.Console.WriteLine(" [1] Core — daily iteration"); - System.Console.WriteLine(" [2] Comprehensive — release validation"); - System.Console.WriteLine(" [3] Edge cases — refactor verification"); - System.Console.WriteLine(" [A] All layers"); - System.Console.WriteLine(" [P] AsyncPipe — streaming I/O isolation (only AsyncPipe, all test data)"); - System.Console.WriteLine(" [Q] Quit"); - System.Console.Write("\nSelection: "); - - var key = System.Console.ReadKey(intercept: false).KeyChar; - System.Console.WriteLine(); - - return char.ToLower(key) switch + while (true) { - '1' => ("core", "standard"), - '2' => ("comprehensive", "standard"), - '3' => ("edge", "standard"), - 'a' => ("all", "standard"), - 'p' => ("all", "asyncpipe"), - 'q' => null, - _ => ("all", "standard") - }; + System.Console.WriteLine(); + System.Console.WriteLine("╔══════════════════════════════════════════════════════════╗"); + System.Console.WriteLine("║ AcBinary Benchmark Suite ║"); + System.Console.WriteLine("╚══════════════════════════════════════════════════════════╝"); + System.Console.WriteLine(); + System.Console.WriteLine("Select benchmark layer:"); + System.Console.WriteLine(); + System.Console.WriteLine(" [1] Core — daily iteration"); + System.Console.WriteLine(" [2] Comprehensive — release validation"); + System.Console.WriteLine(" [3] Edge cases — refactor verification"); + System.Console.WriteLine(" [A] All layers"); + System.Console.WriteLine(" [P] AsyncPipe — streaming I/O isolation (only AsyncPipe, all test data)"); + System.Console.WriteLine($" [S] Settings — modify Warmup ({WarmupIterations}) / Iterations ({TestIterations}) / Samples ({BenchmarkSamples})"); + System.Console.WriteLine(" [Q] Quit"); + System.Console.Write("\nSelection: "); + + var key = System.Console.ReadKey(intercept: false).KeyChar; + System.Console.WriteLine(); + + switch (char.ToLower(key)) + { + case '1': return ("core", "standard"); + case '2': return ("comprehensive", "standard"); + case '3': return ("edge", "standard"); + case 'a': return ("all", "standard"); + case 'p': return ("all", "asyncpipe"); + case 's': + ShowSettingsMenu(); + continue; // re-display the main menu after settings update + case 'q': return null; + default: return ("all", "standard"); + } + } + } + + /// + /// Settings sub-menu — prompts for Warmup / Iterations / Samples values. Empty input keeps the + /// current value. Validation: WarmupIterations ≥ 0; TestIterations ≥ 1; BenchmarkSamples ≥ 1. + /// Returns to the caller (which re-displays the main menu). + /// + private static void ShowSettingsMenu() + { + System.Console.WriteLine(); + System.Console.WriteLine("─────────────────────────────────────────────"); + System.Console.WriteLine("Settings — press Enter to keep current value"); + System.Console.WriteLine("─────────────────────────────────────────────"); + System.Console.WriteLine(); + + WarmupIterations = PromptInt("WarmupIterations", WarmupIterations, min: 0); + TestIterations = PromptInt("TestIterations ", TestIterations, min: 1); + BenchmarkSamples = PromptInt("BenchmarkSamples", BenchmarkSamples, min: 1); + + System.Console.WriteLine(); + System.Console.WriteLine($"✓ Settings updated: Warmup={WarmupIterations} | Iterations={TestIterations} | Samples={BenchmarkSamples}"); + } + + /// + /// Prompts the user for an integer with a default (current value). Returns the current value if + /// the user presses Enter on empty input or if parsing fails / value is below the minimum. + /// + private static int PromptInt(string name, int currentValue, int min) + { + System.Console.Write($" {name} [{currentValue}]: "); + var input = System.Console.ReadLine()?.Trim() ?? ""; + if (input.Length == 0) return currentValue; + if (int.TryParse(input, out var newValue) && newValue >= min) return newValue; + System.Console.WriteLine($" ! Invalid value (need int ≥ {min}) — keeping {currentValue}"); + return currentValue; } /// @@ -1264,6 +1377,185 @@ public static class Program } } + /// + /// Same chunked-framed AsyncPipe code path as , but the transport + /// is an in-memory instead of a kernel NamedPipe. The Pipe's + /// Writer/Reader pair is a managed-only zero-copy slab handoff — no syscalls, no kernel + /// buffer copy, no IRP queueing. + /// + /// Why this benchmark matters: by holding ALL other variables constant (same SerializeChunkedFramed, + /// same AsyncPipeReaderInput, same drain task, same consumer task, same multi-message wire format), this + /// row isolates the kernel-NamedPipe transport overhead from the chunked-streaming framework's pure + /// CPU cost. The expected delta vs : per-chunk overhead drops from + /// ~25-30 µs (kernel-syscall pair + IRP) to ~1-2 µs (managed slab handoff). Multi-chunk Large-message rows + /// should converge dramatically toward . + /// + /// Real-world relevance: in-memory Pipe is the typical primitive used for cross-thread serializer + /// pipelines inside a single process (e.g. SignalR's Kestrel transport adapter, gRPC framework internals, + /// custom message brokers). The numbers from this row reflect that scenario, NOT the kernel-pipe loopback + /// of the NamedPipe benchmark. + /// + private sealed class AcBinaryInMemoryPipeBenchmark : ISerializerBenchmark, IDisposable + { + private readonly TestOrder _order; + private readonly AcBinarySerializerOptions _options; + private readonly byte[] _serialized; // for SerializedSize reporting only + + // Long-lived in-memory pipe lifecycle (set up once in ctor — NOT timed). + private readonly Pipe _pipe; + private readonly PipeWriter _pipeWriter; + private readonly PipeReader _pipeReader; + + // Long-lived multi-message receive infrastructure (set up once in ctor) — same pattern as the NamedPipe + // variant: drain pumps reader into AsyncPipeReaderInput, consumer task drives Deserialize(input). + private readonly AsyncPipeReaderInput _input; + private readonly CancellationTokenSource _cts; + private readonly Task _drainTask; + private readonly Task _consumerTask; + private readonly ManualResetEventSlim _consumeRequest = new(false); + private readonly ManualResetEventSlim _consumeDone = new(false); + private object? _lastResult; + private bool _captureResult; + private bool _disposed; + + public string Engine => EngineAcBinary; + public string IoMode => IoInMemoryPipe; + public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime; + public string OptionsPreset { get; } + public int SerializedSize => _serialized.Length; + public long SetupSerializeAllocBytes { get; } + public long SetupDeserializeAllocBytes { get; } + public bool IsRoundTripOnly => true; + public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=Pipe(in-memory,multiMessage,2-task)"); + + public AcBinaryInMemoryPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) + { + _order = order; + _options = options; + OptionsPreset = optionsPreset; + + _serialized = AcBinarySerializer.Serialize(order, _options); + + // === SERIALIZE-side setup measurement === + // In-memory Pipe construction. NO kernel-pipe pair, NO Connect handshake — just a managed Pipe object + // and a reference to its Writer side. PipeWriterImpl (parallel-flush capable, NOT StreamPipeWriter). + GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var beforeSer = GC.GetAllocatedBytesForCurrentThread(); + _pipe = new Pipe(); + _pipeWriter = _pipe.Writer; + var afterSer = GC.GetAllocatedBytesForCurrentThread(); + SetupSerializeAllocBytes = afterSer - beforeSer; + + // === DESERIALIZE-side setup measurement === + // PipeReader reference + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain task + + // consumer task scaffolding. Identical to the NamedPipe variant on the receive side. + GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var beforeDes = GC.GetAllocatedBytesForCurrentThread(); + _pipeReader = _pipe.Reader; + _input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true); + _cts = new CancellationTokenSource(); + _drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token)); + _consumerTask = Task.Run(ConsumeLoop); + var afterDes = GC.GetAllocatedBytesForCurrentThread(); + SetupDeserializeAllocBytes = afterDes - beforeDes; + } + + // BG consumer: parks on _consumeRequest, runs Deserialize(_input) when signaled, signals _consumeDone. + // Mirror of AcBinaryNamedPipeBenchmark.ConsumeLoop — same pattern, same MRES protocol. + private void ConsumeLoop() + { + var ct = _cts.Token; + try + { + while (true) + { + _consumeRequest.Wait(ct); + if (ct.IsCancellationRequested) return; + _consumeRequest.Reset(); + + try + { + var result = AcBinaryDeserializer.Deserialize(_input, _options); + if (_captureResult) _lastResult = result; + } + catch + { + // Swallow — see ConsumeLoop in NamedPipe variant for rationale. + } + finally + { + _consumeDone.Set(); + } + } + } + catch (OperationCanceledException) + { + // Cooperative cancel — Dispose path. Swallow. + } + } + + public void Warmup(int iterations) + { + for (var i = 0; i < iterations; i++) Serialize(); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Serialize() + { + // Same 2-task streaming pipeline as NamedPipe variant — only the transport differs (in-memory Pipe + // instead of kernel NamedPipe). Per-chunk SerializeChunkedFramed → PipeWriter slab → drain task + // reads from PipeReader → input.Feed → consumer Deserialize consumes byte-by-byte. + _consumeDone.Reset(); + _consumeRequest.Set(); + + AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); + + _consumeDone.Wait(); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Deserialize() + { + // No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract. + } + + public bool VerifyRoundTrip() + { + _captureResult = true; + try + { + Serialize(); + var result = _lastResult as TestOrder; + return result != null && DeepEqualsViaJson(_order, result); + } + finally + { + _captureResult = false; + _lastResult = null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + // Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked. + try { _cts.Cancel(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ } + try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + + // Complete writer + reader (in-memory Pipe — no underlying stream to dispose). + try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + try { _pipeReader.Complete(); } catch { /* swallow on teardown */ } + try { _input.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ } + try { _cts.Dispose(); } catch { /* swallow on teardown */ } + } + } + /// /// Raw byte[] over a long-lived NamedPipe — NO chunk-framing, NO AsyncPipeReaderInput, /// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background consumer task @@ -1479,6 +1771,174 @@ public static class Program } } + /// + /// Raw byte[] over an in-memory cross-thread handoff — NO transport (no NamedPipe, no Pipe, no + /// Channel). Calling thread serialises into a fresh byte[], hands it to a + /// background consumer task via a single byte[] slot + MRES pair; the consumer deserialises and signals done. + /// + /// Why this benchmark matters: completes the 2x2 transport × wire-format matrix: + /// + /// NamedPipe + Chunked = + /// NamedPipe + Raw = + /// In-memory Pipe + Chunked = + /// In-memory + Raw = THIS row — apples-to-apples baseline for the in-memory chunked row + /// + /// Side-by-side with this isolates the chunked-streaming + /// framework's pure CPU cost, with the same in-memory transport (zero kernel involvement) on both sides. + /// Side-by-side with this isolates the kernel-NamedPipe + /// overhead on the raw-byte[] side. + /// + private sealed class AcBinaryInMemoryRawByteArrayBenchmark : ISerializerBenchmark, IDisposable + { + private readonly TestOrder _order; + private readonly AcBinarySerializerOptions _options; + private readonly byte[] _serialized; // for SerializedSize reporting only + + // Long-lived consumer-task infrastructure (Deserialize on BG thread, signaled per iter). + // No transport — just a byte[] slot for handoff between calling thread and consumer task. + private readonly CancellationTokenSource _cts; + private readonly Task _consumerTask; + private readonly ManualResetEventSlim _consumeRequest = new(false); + private readonly ManualResetEventSlim _consumeDone = new(false); + private byte[]? _pendingBytes; // calling thread → consumer task handoff slot + private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters + private bool _captureResult; + private bool _disposed; + + public string Engine => EngineAcBinary; + public string IoMode => IoInMemoryRaw; + public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime; + public string OptionsPreset { get; } + public int SerializedSize => _serialized.Length; + public long SetupSerializeAllocBytes { get; } + public long SetupDeserializeAllocBytes { get; } + public bool IsRoundTripOnly => true; + public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=in-memory(raw,2-task)"); + + public AcBinaryInMemoryRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) + { + _order = order; + _options = options; + OptionsPreset = optionsPreset; + + _serialized = AcBinarySerializer.Serialize(order, _options); + + // === SERIALIZE-side setup measurement === + // Nothing to set up — calling thread allocates byte[] per iter via AcBinarySerializer.Serialize. + SetupSerializeAllocBytes = 0; + + // === DESERIALIZE-side setup measurement === + // 1× background consumer-task + 2× MRES (request / done) + cancellation source. + GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var beforeDes = GC.GetAllocatedBytesForCurrentThread(); + _cts = new CancellationTokenSource(); + _consumerTask = Task.Run(ConsumerLoop); + var afterDes = GC.GetAllocatedBytesForCurrentThread(); + SetupDeserializeAllocBytes = afterDes - beforeDes; + } + + // BG consumer: parks on _consumeRequest, picks up the byte[] from _pendingBytes, runs Deserialize(bytes), + // signals _consumeDone. Direct in-process handoff — no transport syscall, no buffer copy beyond the byte[] + // reference itself (zero-copy by reference). + private void ConsumerLoop() + { + var ct = _cts.Token; + try + { + while (true) + { + _consumeRequest.Wait(ct); + if (ct.IsCancellationRequested) return; + _consumeRequest.Reset(); + + try + { + var bytes = _pendingBytes; + if (bytes != null) + { + var result = AcBinaryDeserializer.Deserialize(bytes, _options); + if (_captureResult) _lastResult = result; + } + } + catch + { + // Swallow — see ConsumerLoop in NamedPipe variant for rationale. + } + finally + { + _consumeDone.Set(); + } + } + } + catch (OperationCanceledException) + { + // Cooperative cancel — Dispose path. Swallow. + } + } + + public void Warmup(int iterations) + { + for (var i = 0; i < iterations; i++) Serialize(); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Serialize() + { + // 2-task in-memory pipeline: + // 1. Calling thread serialises → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark contract). + // 2. Calling thread parks the byte[] into _pendingBytes and signals consumer task. Consumer task + // picks up the reference (zero-copy) and runs Deserialize(bytes). + // 3. Calling thread waits for _consumeDone (consumer task finished Des). + // + // Same architectural limitation as the NamedPipe-raw variant: Des cannot start until full bytes + // are available. Only the per-iter Ser↔Des thread-handoff overlaps slightly (calling thread starts + // signalling and waiting while consumer thread takes the byte[]). + var bytes = AcBinarySerializer.Serialize(_order, _options); + + _pendingBytes = bytes; + _consumeDone.Reset(); + _consumeRequest.Set(); + + _consumeDone.Wait(); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Deserialize() + { + // No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract. + } + + public bool VerifyRoundTrip() + { + _captureResult = true; + try + { + Serialize(); + var result = _lastResult as TestOrder; + return result != null && DeepEqualsViaJson(_order, result); + } + finally + { + _captureResult = false; + _lastResult = null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try { _cts.Cancel(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ } + try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + + try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ } + try { _cts.Dispose(); } catch { /* swallow on teardown */ } + } + } + /// /// Benchmarks MemoryPack via the IBufferWriter overload, allocating a FRESH ArrayBufferWriter on EVERY call. /// Apples-to-apples counterpart to AcBinaryFreshBufferWriterBenchmark. diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index 104f648..cbef857 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -14,10 +14,11 @@ namespace AyCode.Core.Tests.Serialization; /// the tests own the / /// lifecycle directly and call the generic /// + -/// primitives. This proves -/// the streaming framework works on arbitrary PipeWriter/PipeReader sources -/// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in -/// the framework. +/// +/// primitives, with the receive-side drain implemented via the test-only +/// extension. This proves the streaming +/// framework works on arbitrary PipeWriter/PipeReader sources (NamedPipe, FileStream, +/// NetworkStream, custom transports) without per-transport adapters in the framework. /// /// With BufferWriterChunkSize = 256, even small test payloads cross multiple chunk /// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior. @@ -104,8 +105,10 @@ public class AcBinarySerializerNamedPipeTests /// /// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic /// on - /// the client side and - /// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter / + /// the client side, and on the server side runs the canonical drain+deserialize pair + /// (test-only on the calling thread, + /// + /// on a Task.Run BG thread). The framework helpers know nothing about NamedPipe — only PipeWriter / /// PipeReader. /// private static async Task RunNamedPipeRoundTripAsync(string pipeName, T original, AcBinarySerializerOptions opts) @@ -119,7 +122,12 @@ public class AcBinarySerializerNamedPipeTests await pipeServer.WaitForConnectionAsync().ConfigureAwait(false); var pipeReader = PipeReader.Create(pipeServer); - return await AcBinaryDeserializer.DeserializeFromPipeReaderAsync(pipeReader, opts).ConfigureAwait(false); + // Inlined version of what the removed DeserializeFromPipeReaderAsync used to do: + // single-message mode + drain on calling thread + deserialize on Task.Run BG. + using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, multiMessage: false); + var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, opts)); + await input.DrainFromAsync(pipeReader).ConfigureAwait(false); + return await deserTask.ConfigureAwait(false); }); await using var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs b/AyCode.Core.Tests/Serialization/AsyncPipeReaderInputExtensions.cs similarity index 64% rename from AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs rename to AyCode.Core.Tests/Serialization/AsyncPipeReaderInputExtensions.cs index 06e942a..c289081 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs +++ b/AyCode.Core.Tests/Serialization/AsyncPipeReaderInputExtensions.cs @@ -1,18 +1,22 @@ +using AyCode.Core.Serializers.Binaries; using System; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -namespace AyCode.Core.Serializers.Binaries; +namespace AyCode.Core.Tests.Serialization; /// -/// Extension methods for populating from -/// -backed transports (NamedPipe, FileStream, +/// Test/benchmark-only extension methods for populating +/// from -backed transports (NamedPipe, FileStream, /// custom pipe sources). /// -/// Lives in a separate file from the core class so does not -/// import System.IO.Pipelines in its primary surface — the optional pull-mode is visible -/// at use-sites (per ADR-0003 Decision §3 at docs/adr/0003-acbinary-streaming-receive-architecture.md). +/// Why test-only: in real production, the consuming application already has its own +/// reader-task that reads from the pipe and pushes bytes via AsyncPipeReaderInput.Feed +/// — providing this drain extension publicly would duplicate that responsibility and confuse +/// the canonical push-pattern. The extension is kept here for unit-test scaffolding and the +/// streaming benchmark; production NuGet consumers should write their own drain logic in their +/// own reader-task following the application's threading model. /// public static class AsyncPipeReaderInputExtensions { @@ -21,9 +25,9 @@ public static class AsyncPipeReaderInputExtensions /// calls on each segment and /// when the pipe completes. /// - /// Typical usage: NamedPipe IPC and FileStream-via-PipeReader transports schedule this - /// on a background task while the deserialization context reads from the same input on - /// another thread. + /// Typical usage (test-only): NamedPipe IPC and FileStream-via-PipeReader transports + /// schedule this on a background task while the deserialization context reads from the same + /// input on another thread. /// /// is invoked in a finally block — /// ensures the consumer always wakes up even if the pipe read throws or the operation is diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index f98936a..d759ff7 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -337,52 +337,6 @@ public static partial class AcBinaryDeserializer } } - /// - /// Deserialize from a with full streaming pipeline - /// parallelism — drains the reader on the calling thread, while a background Task.Run - /// deserializes incrementally from the same shared . - /// - /// Transport-agnostic: works with any PipeReader source — NamedPipe IPC - /// (PipeReader.Create(namedPipeServerStream)), file-stream - /// (PipeReader.Create(fileStream)), TCP (PipeReader.Create(networkStream)), - /// or custom PipeReader implementations. Reads raw AcBinary bytes verbatim from - /// the pipe — no wire-format unwrapping. Pair with the producer-side - /// - /// (or its overload), which writes the same raw byte - /// stream as 's - /// byte[] output. - /// - /// Receive buffer initial capacity is derived from options.BufferWriterChunkSize × 2 - /// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the - /// message's lifetime regardless of total payload size. - /// - /// For the multiplexed wire format (per-chunk [201][UINT16][data] headers, - /// produced by SerializeChunkedFramed or SignalR's AsyncSegment mode): the parser - /// strips framing on its own (e.g. AcBinaryHubProtocol.TryParseChunkData) and feeds - /// only the data bytes here. - /// - /// Source pipe reader. Caller owns lifecycle (creation + completion). - /// Serializer options. Defaults to . - /// BufferWriterChunkSize controls the receive-side initial buffer (× 2 headroom). - /// Cancellation token. For connect-timeout, pass the token of a - /// new CancellationTokenSource(timeout). - public static async Task DeserializeFromPipeReaderAsync(System.IO.Pipelines.PipeReader reader, AcBinarySerializerOptions? options = null, CancellationToken ct = default) - { - if (reader is null) throw new ArgumentNullException(nameof(reader)); - - var opts = options ?? AcBinarySerializerOptions.Default; - - // Single-message mode (multiMessage: false) — bytes drained from the PipeReader are forwarded - // verbatim to the deserialization buffer. Pair with AcBinarySerializer.SerializeChunked - // (raw byte stream) on the producer side; for multi-message framed wire formats the parser - // strips framing upstream and feeds only data bytes here. - using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, multiMessage: false); - var deserTask = Task.Run(() => Deserialize(input, opts), ct); - - await input.DrainFromAsync(reader, ct).ConfigureAwait(false); - return await deserTask.ConfigureAwait(false); - } - /// /// Internal: Deserialize with any TInput (multi-segment or other future input types). /// diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 92a149d..82c7949 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -426,9 +426,10 @@ public static partial class AcBinarySerializer /// Serialize to a as a chunked stream — pure AcBinary /// bytes are written via in raw mode (no per-chunk header). /// The output is byte-compatible with 's - /// byte[] result; a consumer can drain pipe.Reader and feed the bytes directly to - /// (or pipe-them through DeserializeFromPipeReaderAsync) - /// with no extra parser. + /// byte[] result; a consumer drains pipe.Reader in its own reader-task and pushes + /// bytes via , then calls + /// + /// — no extra parser, no special transport adapter. /// /// Why instead of ? /// Pipe.Writer is always the BCL PipeWriterImpl, which is parallel-capable diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index 53f9b22..f251dfc 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -24,15 +24,49 @@ namespace AyCode.Core.Serializers.Binaries; /// verbatim (matches AcBinarySerializer.SerializeChunked raw output drained from a /// ); single-message scenario, no auto-reset. /// -/// Usage modes: +/// Usage: push pattern only. The consumer's reader-task reads bytes from any +/// underlying transport (the framework knows nothing about which) and pushes them via +/// ; a separate consumer thread (or task) calls +/// . +/// The framework does NOT own the transport — the consumer's reader-task does, following the +/// application's threading model. +/// +/// When chunked-streaming is the right fit (vs raw byte[] / +/// ): /// -/// Push (Feed-API): producer thread calls with chunk bytes -/// (typical for SignalR TryParseChunkData). -/// Pull (DrainFromAsync extension): helper drains a -/// into the input via repeated -/// calls (typical for NamedPipe / FileStream / NetworkStream). +/// Network transports — TCP / UDP / WebSocket / SSE / HTTP/2 streams. Per-chunk +/// CPU overhead (~30 µs / chunk) is invisible next to ms-scale RTT; the streaming +/// pipeline lets sender, transport, and receiver work in parallel on different parts of +/// the message. +/// Multi-connection servers — Kestrel-style (SignalR), gRPC servers, custom RPC +/// hosts. Per-connection peak memory bounded by buffer-size (e.g. 32 KB), not by max +/// message size — 1000 concurrent connections × 1 MB messages = 32 MB peak (vs 1 GB +/// with raw byte[]). LOH allocation pressure (≥ 85 KB messages) is also avoided. +/// Message brokers / queues — Kafka / Redis Streams / Azure Service Bus clients +/// that expose sinks. Streaming serialize +/// writes directly into the transport buffer — no intermediate byte[] allocation. +/// File streamingFileStream behind a +/// . 100 MB+ payloads from disk with constant +/// 32 KB peak memory. +/// In-memory cross-thread handoff — +/// producer + consumer threads coordinate over a shared Pipe; zero-copy slab handoff. +/// Custom transport adapters — anything where the consumer wants to push bytes +/// from a transport-specific reader-task. /// /// +/// When raw byte[] is the right fit: same-process loopback IPC where transport +/// latency is near zero, single-producer/single-consumer batch operations where peak memory is +/// not a constraint, sub-LOH messages (< 85 KB) with no GC-pressure concerns. The chunked-streaming +/// per-chunk CPU overhead is fully visible in these scenarios — raw is faster end-to-end. +/// +/// Performance characteristic: per-chunk overhead is roughly constant (~25-30 µs — +/// FlushAsync syscall + ReadAsync syscall + framing-parse + sliding-window bookkeeping). Total +/// chunk-overhead = (messageSize / chunkSize) × ~30 µs. The streaming benefit is pipeline +/// parallelism + bounded peak memory — both of which require a non-trivial transport stage to +/// surface (network, file, cross-thread queue). On same-process loopback NamedPipe (the worst-case +/// benchmark scenario), the per-chunk cost dominates and chunked appears slower than raw — this +/// is a benchmark-artifact, not the production characteristic. +/// /// Backed by a single contiguous byte[] from . Positions reset /// to 0 when the consumer catches up (sliding-window cycling — peak buffer memory bounded by /// chunk size, NOT message size). Grow is the absolute last resort and practically never fires @@ -312,8 +346,8 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable } /// - /// Whether has been called (or 's underlying - /// stream signalled EOF and the finally block closed the input). Once true, the session + /// Whether has been called (typically by the consumer's reader-task + /// finally-block after the underlying transport signals EOF). Once true, the session /// has ended — any pending /// call returns whatever partial buffer is left, and subsequent calls return immediately. /// diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 900476a..d8320a2 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -53,7 +53,33 @@ namespace AyCode.Core.Serializers.Binaries; /// passes 10 s from its options). A /// propagates to the caller, allowing the connection to abort instead of blocking forever. /// -/// Maximum chunk data size (in framed mode): 65535 bytes (UINT16 max). +/// Maximum chunk data size (in framed mode): 65535 bytes (UINT16 max). +/// +/// When chunked-streaming is the right fit (vs raw byte[] output via +/// ): +/// +/// Network transports — TCP / UDP / WebSocket / SSE / HTTP/2 streams. Per-chunk +/// CPU overhead is invisible next to ms-scale RTT; pipeline parallelism lets sender, +/// transport, and receiver work in parallel on different parts of the message. +/// Multi-connection servers — Kestrel-style (SignalR), gRPC / proprietary RPC. +/// Per-connection peak memory bounded by chunk-size; LOH allocation pressure (≥ 85 KB +/// messages) is avoided. +/// Message brokers / queues — Kafka / Redis Streams / Service Bus clients exposing +/// or PipeWriter sinks. Streaming +/// serialize writes directly into the transport buffer. +/// File streamingFileStream-backed . +/// 100 MB+ payloads to disk with constant peak memory. +/// In-memory cross-thread — producer thread +/// serialises while consumer thread deserialises in parallel. +/// Custom transport adapters — anything where the application owns a +/// PipeWriter or IBufferWriter sink and wants incremental output. +/// +/// +/// When raw byte[] output is the right fit: same-process loopback IPC where +/// transport latency is near zero, single batch-style operations where peak memory is not a +/// constraint, sub-LOH messages (< 85 KB) with no GC-pressure concerns. The chunked-streaming +/// per-chunk CPU overhead is fully visible in these scenarios — raw is faster end-to-end. Pick the +/// chunked path only when the transport stage is non-trivial (network, file, cross-thread queue). /// public struct AsyncPipeWriterOutput : IBinaryOutputBase { diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md index 761426f..824de35 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md @@ -19,16 +19,24 @@ Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain uncha - New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race). - Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression). -## ACCORE-BIN-T-M2K1: Add `AsyncPipeReaderInput.DrainFromAsync` extension (Step 2 of ADR-0003) -**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4` +## ACCORE-BIN-T-M2K1: ~~Add `AsyncPipeReaderInput.DrainFromAsync` extension~~ (Step 2 of ADR-0003) +**Status:** Closed (2026-05-02) — moved to test-only assembly · **Priority:** ~~P1~~ · **Type:** ~~Feature~~ · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4` -Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file). Pulls from a `System.IO.Pipelines.PipeReader` and feeds the input via repeated `Feed(span)` calls; calls `Complete()` at end-of-stream. +~~Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file).~~ -Separate file (not a method on the class) so the core `AsyncPipeReaderInput.cs` does not import `System.IO.Pipelines` in its primary contract — the pull-mode is opt-in at use-sites. +### Resolution (2026-05-02) -**Acceptance:** -- Extension drains an in-memory `Pipe` end-to-end in a unit test (write some bytes → DrainFromAsync → assert AsyncPipeReaderInput state). -- `Complete()` correctly invoked at end-of-stream (`result.IsCompleted`). +The drain-extension was originally added as a public framework helper, paired with the public `AcBinaryDeserializer.DeserializeFromPipeReaderAsync` method. Subsequent design review concluded that **both** are unnecessary on the public NuGet API surface: + +- **Real production consumers** always have their own reader-task that owns the pipe-lifecycle and threading model (SignalR → Kestrel; NamedPipe IPC → consumer's connection loop; FileStream → consumer's read loop). They naturally push bytes via `AsyncPipeReaderInput.Feed` from their own task. +- **A framework-provided `DrainFromAsync` / `DeserializeFromPipeReaderAsync`** would either duplicate that responsibility or impose a specific threading model on consumers. Either is bad NuGet API design. + +**Resolution actions:** +- `AsyncPipeReaderInputExtensions` (containing `DrainFromAsync`) **moved** from `AyCode.Core/Serializers/Binaries/` → `AyCode.Core.Tests/Serialization/` (namespace `AyCode.Core.Tests.Serialization`). Test-only assembly; the streaming benchmark project references the Tests project to access it. +- `AcBinaryDeserializer.DeserializeFromPipeReaderAsync` overloads **removed** from public API. Tests that previously used it inline the canonical drain+deserialize pair (`Task.Run(() => Deserialize(input))` + `await input.DrainFromAsync(reader)`) so the test assertion still exercises the same code paths. +- `AsyncPipeReaderInput` and `AcBinarySerializer` doc-comments updated to remove references to `DrainFromAsync` and `DeserializeFromPipeReaderAsync` from the public-facing pattern docs; the canonical recommended pattern is "consumer's reader-task pushes via Feed." + +**Acceptance:** existing `AcBinarySerializerPipeParallelTests` and `AcBinarySerializerNamedPipeTests` continue to pass on the new test-only extension; benchmark (`Console.AsyncPipe` mode) continues to work. ## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003) **Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1` @@ -130,9 +138,17 @@ Possible directions for a failover / reconnect scenario (none committed): Each has different thread-safety implications and wire-formatting reentrancy requirements; explore before any code. ## ACCORE-BIN-T-S5N2: Pattern catalogue in the public class summary -**Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface) +**Status:** Partially Resolved (2026-05-02) · **Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface) -Extend the `AsyncPipeReaderInput` class summary with an explicit pattern catalogue: each supported producer-consumer pattern (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with its recommended API (low-level Feed/Initialize/MessageDone vs. any higher-level convenience wrapper). NuGet consumers can self-select the pattern that matches their threading model. +Original scope: extend the `AsyncPipeReaderInput` class summary with an explicit producer-consumer pattern catalogue (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with recommended API per pattern. + +### Resolution (2026-05-02): broader scope captured + +Both `AsyncPipeReaderInput` (reader-side) and `AsyncPipeWriterOutput` (writer-side) class summaries now contain a **"When chunked-streaming is the right fit"** catalogue covering the realistic transport scenarios — not just SignalR / NamedPipe-only. The list explicitly enumerates: network transports (TCP/UDP/WebSocket/SSE/HTTP/2), multi-connection servers (SignalR/gRPC/proprietary RPC), message brokers (Kafka/Redis Streams/Service Bus), file streaming (`FileStream`-backed `PipeReader`/`PipeWriter`), in-memory cross-thread `Pipe`, and custom transport adapters. The complementary "When raw `byte[]` is the right fit" section calls out the loopback-IPC / sub-LOH-message / no-GC-pressure case where chunked overhead is visible without commensurate benefit. + +The single-class API surface reflects **push pattern only** (the dominant pattern across all listed use cases); the more granular threading-model patterns from the original S5N2 scope (single-thread vs multi-thread feed+deserialize) are now covered by the **explicit "consumer's reader-task drives the transport" framing** in the class summary — applications choose their threading model, the input class is agnostic. + +**Open follow-up**: a dedicated `BINARY/CHOOSING_API.md` (or a section in `BINARY/README.md`) cross-cutting the `byte[]` vs `IBufferWriter` vs `AsyncPipeReaderInput`/`AsyncPipeWriterOutput` decision could lift this from the class XML-doc to navigable doc-folder content. Tracked separately if/when a NuGet consumer hits the discoverability gap. ## ACCORE-BIN-T-G3W8: Transport-buffer alignment best-practice doc section **Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2-chunk-on-wire-size--chunksize--headersize-caused-page-fragmentation)