From a537f18294acba65a1eaceb3f0a0e2f3b56cac98 Mon Sep 17 00:00:00 2001 From: Loretta Date: Sat, 2 May 2026 00:03:22 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Add raw NamedPipe benchmark & mux-mode AsyncPipe docs - Add AcBinaryNamedPipeRawByteArrayBenchmark for raw NamedPipe (no chunking) to isolate kernel vs. AsyncPipe overhead - Refactor progress reporting with in-place updates for all timed/allocation benchmarks - Document [0xC8] marker as mux-mode direction; add ACCORE-BIN-T-M2X7 and ACCORE-BIN-I-C4N7 for multi-stream and single-consumer constraints - Expand BINARY_WRITERS.md with parallel-flush regime analysis and allocation context - Improve result comparison robustness for AsyncPipe-only runs - Minor doc clarifications and explicit AsyncPipeReaderInput usage patterns --- AyCode.Core.Serializers.Console/Program.cs | 357 ++++++++++++++++-- .../docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md | 26 +- .../docs/BINARY/BINARY_ASYNCPIPE_TODO.md | 55 +++ AyCode.Core/docs/BINARY/BINARY_WRITERS.md | 19 + 4 files changed, 430 insertions(+), 27 deletions(-) diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index a93924e..4a9a7fb 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -37,6 +37,16 @@ public static class Program private const string BuildConfiguration = "Release"; #endif +#if DEBUG + private static int WarmupIterations = 0; + private static int TestIterations = 1; + private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration +#else + private static int WarmupIterations = 5000; //5000 + private static int TestIterations = 1000; //1000 + private static int BenchmarkSamples = 3; +#endif + // Serializer name constants // Engine identifiers (used in Engine column + comparison logic) private const string EngineAcBinary = "AcBinary"; @@ -50,6 +60,7 @@ public static class Program private const string IoBufWrNew = "BufWr new"; private const string IoString = "String"; private const string IoNamedPipe = "NamedPipe"; + private const string IoNamedPipeRaw = "NamedPipe"; // Dispatch mode identifiers — describes how property access / type dispatch happens for a given run. // SGen = compile-time source generator path (Unsafe.As direct fields, slot-array wrapper lookup). @@ -132,16 +143,6 @@ public static class Program [MethodImpl(MethodImplOptions.AggressiveInlining)] private static double ToKilobytes(long bytes) => bytes / 1024.0; - #if DEBUG - private static int WarmupIterations = 0; - private static int TestIterations = 1; - private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration - #else - private static int WarmupIterations = 5000; //5000 - private static int TestIterations = 1000; //1000 - private static int BenchmarkSamples = 3; - #endif - public static void Main(string[] args) { // Set console encoding to UTF-8 for proper Unicode character display @@ -385,6 +386,10 @@ public static class Program IsRoundTripOnly = serializer.IsRoundTripOnly }; + // Group label for in-place \r progress. Identifies (cell × serializer) so a stuck benchmark + // is visibly stuck on a specific row at a specific %% rather than silently hanging. + var groupLabel = $"{result.SerializerName}"; + if (serializer.IsRoundTripOnly) { // Round-trip-only benchmarks (NamedPipe etc.): measure the full pipe round-trip directly into the RT @@ -393,8 +398,8 @@ public static class Program // also show up — otherwise current-thread alloc would only count the client side and look ~halved. if (mode is "all" or "serialize" or "ser") { - result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations); - result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations); + result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations, $"{groupLabel} [RT timing]"); + result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations, $"{groupLabel} [RT alloc]"); } // mode == "deserialize" alone is meaningless for a round-trip-only benchmark; skip silently. } @@ -402,15 +407,15 @@ public static class Program { if (mode is "all" or "serialize" or "ser") { - result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations); + result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations, $"{groupLabel} [Ser timing]"); // Dedicated alloc-only sample (separate from timing samples; keeps timing pure) - result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations); + result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations, $"{groupLabel} [Ser alloc]"); } if (mode is "all" or "deserialize" or "des") { - result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations); - result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations); + result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations, $"{groupLabel} [Des timing]"); + result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations, $"{groupLabel} [Des alloc]"); } // Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable). @@ -448,7 +453,16 @@ public static class Program return new List { + // Chunked-framed AsyncPipe: SerializeChunkedFramed + AsyncPipeReaderInput.DrainFromAsync. + // Measures the FULL streaming-I/O stack — wire framing + drain task + sliding-window buffer + + // MRES wait-on-byte-shortage — over a kernel NamedPipe. new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeChunk)"), + // Raw byte[] over NamedPipe (sync receive, no chunk-framing). Same kernel-pipe transport, + // same inBufferSize, but: serialize → byte[] → Stream.Write → Stream.Read → Deserialize(byte[]). + // No drain task, no AsyncPipeReaderInput, no [201][UINT16][data]…[202] framing. Side-by-side with + // the chunked-row above this isolates AsyncPipe-framework-overhead (Δ vs raw) from + // kernel-transport-overhead (raw vs in-process Byte[]). + new AcBinaryNamedPipeRawByteArrayBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeRaw)"), }; } @@ -528,17 +542,20 @@ public static class Program /// returning the median elapsed time. Multi-sample design reduces single-run variance from ~±15% to ~±5% /// by smoothing transient effects (background activity, thermal/turbo state, JIT tier-promotion timing). /// When <= 1, falls back to single-sample timing (Debug / quick mode). + /// When is non-null, emits in-place \r progress updates so a + /// stuck benchmark (e.g. deadlocked NamedPipe row) is visibly stuck at a specific %% rather than + /// silently hanging. /// - private static double RunTimed(Action action, int iterations) + private static double RunTimed(Action action, int iterations, string? progressLabel = null) { var samples = BenchmarkSamples; if (samples <= 1) { // Single-sample fast path (Debug or trivial run) — no allocation, no sort. var sw = Stopwatch.StartNew(); - for (var i = 0; i < iterations; i++) action(); - + RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0); sw.Stop(); + EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds); return sw.Elapsed.TotalMilliseconds; } @@ -546,30 +563,34 @@ public static class Program for (var s = 0; s < samples; s++) { var sw = Stopwatch.StartNew(); - for (var i = 0; i < iterations; i++) action(); - + RunWithProgress(action, iterations, progressLabel, samples, s); sw.Stop(); times[s] = sw.Elapsed.TotalMilliseconds; } Array.Sort(times); // Median: middle value for odd sample counts, average of two middles for even counts. - return samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0; + var medianMs = samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0; + EndProgress(progressLabel, medianMs); + return medianMs; } /// /// Measures per-call allocation in bytes after a clean GC. Single dedicated sample (no median) — keeps timing samples pure. /// - private static long MeasureAllocation(Action action, int iterations) + private static long MeasureAllocation(Action action, int iterations, string? progressLabel = null) { GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var sw = Stopwatch.StartNew(); var before = GC.GetAllocatedBytesForCurrentThread(); - for (var i = 0; i < iterations; i++) action(); + RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0); var after = GC.GetAllocatedBytesForCurrentThread(); + sw.Stop(); + EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds); return (after - before) / iterations; } @@ -581,19 +602,82 @@ public static class Program /// Slightly noisier than the per-thread variant (background threads / GC bookkeeping leak in), but /// over 1000 iterations the signal dominates. /// - private static long MeasureAllocationTotal(Action action, int iterations) + private static long MeasureAllocationTotal(Action action, int iterations, string? progressLabel = null) { GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var sw = Stopwatch.StartNew(); var before = GC.GetTotalAllocatedBytes(precise: true); - for (var i = 0; i < iterations; i++) action(); + RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0); var after = GC.GetTotalAllocatedBytes(precise: true); + sw.Stop(); + EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds); return (after - before) / iterations; } + // ============================================================================================ + // Progress reporting — \r-driven in-place updates so a stuck benchmark surfaces the exact phase + // and % where it stopped, instead of appearing as a silent hang. Used by RunTimed and the + // MeasureAllocation* helpers when the caller passes a non-null progressLabel. + // ============================================================================================ + + // Tracks the longest line written by the current progress session, so EndProgress can clear + // any leftover characters from a prior longer line (avoids "ghost" trailing chars after \r). + private static int _progressLastLineLen; + + /// + /// Runs times, emitting \r-overwriting + /// progress every ~10% (approx. 10 progress prints per sample). When + /// is null, runs without any progress output (zero overhead beyond a null check per iter). + /// + private static void RunWithProgress(Action action, int iterations, string? label, int samples, int sampleIndex) + { + if (label is null) + { + for (var i = 0; i < iterations; i++) action(); + return; + } + + // ~10 progress emits per sample run. Avoid emitting on every iter (Console.Write is + // expensive enough to skew sub-µs benchmarks if overdone). + var step = Math.Max(1, iterations / 10); + for (var i = 0; i < iterations; i++) + { + action(); + if ((i + 1) % step == 0 || i == iterations - 1) + { + var pct = (int)((i + 1) * 100L / iterations); + var line = samples > 1 + ? $" > {label} sample {sampleIndex + 1}/{samples} {pct,3}% ({i + 1}/{iterations})" + : $" > {label} {pct,3}% ({i + 1}/{iterations})"; + System.Console.Write('\r'); + System.Console.Write(line); + if (line.Length < _progressLastLineLen) + System.Console.Write(new string(' ', _progressLastLineLen - line.Length)); + _progressLastLineLen = line.Length; + } + } + } + + /// + /// Closes a progress line cleanly: clears any leftover chars and writes a final "done" line on + /// the same row, terminated by \n so subsequent WriteLine calls render below. + /// + private static void EndProgress(string? label, double elapsedMs) + { + if (label is null) return; + var done = $" > {label} done in {elapsedMs,7:F1} ms"; + System.Console.Write('\r'); + System.Console.Write(done); + if (done.Length < _progressLastLineLen) + System.Console.Write(new string(' ', _progressLastLineLen - done.Length)); + System.Console.WriteLine(); + _progressLastLineLen = 0; + } + private static readonly JsonSerializerOptions VerifyJsonOpts = new() { WriteIndented = false, @@ -1121,6 +1205,213 @@ public static class Program } } + /// + /// Raw byte[] over a long-lived NamedPipe — NO chunk-framing, NO AsyncPipeReaderInput, + /// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background read-thread + /// drains the pipe sync; calling thread deserialises. The background-read mirrors + /// 's drain task — needed to avoid a kernel-buffer-full deadlock + /// when bytes.Length > inBufferSize (Large/Repeated/Deep payloads on a 16 KB pipe-buffer). + /// + /// Side-by-side with (chunked-framed AsyncPipe stack) this + /// isolates two cost components on the SAME kernel-pipe transport with the SAME inBufferSize: + /// + /// This row vs (Byte[]) — pure kernel-NamedPipe + /// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff). + /// This row vs (chunked-framed) — pure + /// AsyncPipe-framework overhead (chunk header writes + sliding-window Feed + MRES wait inside + /// AsyncPipeReaderInput). Both rows have a long-lived read-thread; only the framing differs. + /// + /// Per-iter byte[] allocation from AcBinarySerializer.Serialize is part of the cost (matches + /// 's API contract); the receive-side scratch buffer is also allocated per-iter + /// on the read-thread (counted via GC.GetTotalAllocatedBytes in MeasureAllocationTotal). + /// + private sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable + { + private readonly TestOrder _order; + private readonly AcBinarySerializerOptions _options; + private readonly byte[] _serialized; // for SerializedSize reporting + receive-side size known upfront + + // Long-lived pipe lifecycle (set up once in ctor — NOT timed). + private readonly NamedPipeServerStream _pipeServer; + private readonly NamedPipeClientStream _pipeClient; + + // Long-lived read-thread infrastructure — mirrors AcBinaryNamedPipeBenchmark's _drainTask. Needed + // to prevent kernel-buffer-full deadlock: when bytes.Length > inBufferSize, _pipeClient.Write + // blocks until the kernel buffer drains, but the drain only happens when SOMETHING reads from the + // server end. Single-thread Write→Read sequencing → deadlock. Background-read overlaps the read. + private readonly CancellationTokenSource _cts; + private readonly Task _readTask; + private readonly ManualResetEventSlim _readRequest = new(false); + private readonly ManualResetEventSlim _readDone = new(false); + private int _pendingReadSize; + private byte[]? _receivedSlot; + private bool _disposed; + + public string Engine => EngineAcBinary; + public string IoMode => IoNamedPipeRaw; + public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime; + public string OptionsPreset { get; } + public int SerializedSize => _serialized.Length; + public long SetupSerializeAllocBytes { get; } + public long SetupDeserializeAllocBytes { get; } + public bool IsRoundTripOnly => true; + public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,bg-read)"); + + public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) + { + _order = order; + // BufferWriterChunkSize comes from the caller — same source-of-truth contract as + // AcBinaryNamedPipeBenchmark. The kernel pipe-buffer (inBufferSize) is wired to it so the + // raw-vs-chunked comparison runs on identical transport conditions. + _options = options; + OptionsPreset = optionsPreset; + + _serialized = AcBinarySerializer.Serialize(order, _options); + + var pipeName = $"AcBinaryBenchRaw-{Guid.NewGuid():N}"; + + // === SERIALIZE-side setup measurement === + // pipe-pair (server + client) + connect handshake. NO PipeWriter wrapper — we use the raw + // Stream.Write API directly, matching the no-framing semantics of this benchmark. + GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var beforeSer = GC.GetAllocatedBytesForCurrentThread(); + _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, + System.IO.Pipes.PipeOptions.Asynchronous, + inBufferSize: _options.BufferWriterChunkSize, + outBufferSize: _options.BufferWriterChunkSize); + _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); + + var serverWait = _pipeServer.WaitForConnectionAsync(); + _pipeClient.Connect(); + serverWait.GetAwaiter().GetResult(); + var afterSer = GC.GetAllocatedBytesForCurrentThread(); + SetupSerializeAllocBytes = afterSer - beforeSer; + + // === DESERIALIZE-side setup measurement === + // 1× background read-thread + 2× MRES (request / done) + cancellation source. Matches the + // chunked benchmark's deserialize-side setup cost shape (it has 1× drain Task + AsyncPipeReaderInput + // with 1× MRES + ArrayPool rent). + GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); + var beforeDes = GC.GetAllocatedBytesForCurrentThread(); + _cts = new CancellationTokenSource(); + _readTask = Task.Run(ReadLoop); + var afterDes = GC.GetAllocatedBytesForCurrentThread(); + SetupDeserializeAllocBytes = afterDes - beforeDes; + } + + // Long-lived read-loop on a background thread. Pattern: wait for request → drain N bytes → publish + // result via _receivedSlot → signal done. The calling thread provides the size via _pendingReadSize + // BEFORE setting _readRequest, so the read-thread always knows how much to read. + private void ReadLoop() + { + var ct = _cts.Token; + try + { + while (!ct.IsCancellationRequested) + { + _readRequest.Wait(ct); + if (ct.IsCancellationRequested) break; + _readRequest.Reset(); + + var size = _pendingReadSize; + var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal + var totalRead = 0; + while (totalRead < size) + { + var n = _pipeServer.Read(bytes, totalRead, size - totalRead); + if (n == 0) break; // pipe closed / EOF — partial read returned to caller + totalRead += n; + } + _receivedSlot = bytes; + _readDone.Set(); + } + } + catch (OperationCanceledException) + { + // Cooperative cancel — Dispose path. Swallow. + } + catch + { + // Any other error during teardown → swallow; the calling thread's _readDone.Wait() + // would then time out, surfaced by the dispose timeout below. + } + } + + public void Warmup(int iterations) + { + for (var i = 0; i < iterations; i++) + { + Serialize(); + } + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Serialize() + { + // Sender: serialize → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark API contract). + var bytes = AcBinarySerializer.Serialize(_order, _options); + + // Hand off the expected size to the read-thread BEFORE signalling — read-thread reads + // _pendingReadSize after _readRequest.Wait returns, so write-then-set ordering is sufficient + // (MRES.Set has release semantics; MRES.Wait has acquire). + _pendingReadSize = bytes.Length; + _readDone.Reset(); + _readRequest.Set(); + + // Sync write on calling thread, OVERLAPPING with the read-thread's Read loop. The kernel + // buffer may fill (bytes.Length > inBufferSize) — Write blocks; the read-thread drains; + // Write resumes. Total wall time ≈ Write-bound or Read-bound, whichever is slower. + _pipeClient.Write(bytes, 0, bytes.Length); + _pipeClient.Flush(); + + // Wait for the read-thread to finish accumulating the message. + _readDone.Wait(); + var receivedBytes = _receivedSlot!; + + _ = AcBinaryDeserializer.Deserialize(receivedBytes, _options); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Deserialize() + { + // No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract. + } + + public bool VerifyRoundTrip() + { + // Inlined version of Serialize() that captures the deserialised graph (Serialize()'s + // discard-pattern is correct for the timed loop but useless for verification). + var bytes = AcBinarySerializer.Serialize(_order, _options); + _pendingReadSize = bytes.Length; + _readDone.Reset(); + _readRequest.Set(); + _pipeClient.Write(bytes, 0, bytes.Length); + _pipeClient.Flush(); + _readDone.Wait(); + var received = _receivedSlot!; + var result = AcBinaryDeserializer.Deserialize(received, _options); + return result != null && DeepEqualsViaJson(_order, result); + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + // Cancel the read-loop → ReadLoop exits its Wait via OperationCanceledException. + try { _cts.Cancel(); } catch { /* swallow on teardown */ } + try { _readRequest.Set(); } catch { /* nudge in case Wait is parked */ } + try { _readTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + + // Symmetric teardown — close client first (writer side), then server. + try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } + try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } + try { _readRequest.Dispose(); } catch { /* swallow on teardown */ } + try { _readDone.Dispose(); } catch { /* swallow on teardown */ } + try { _cts.Dispose(); } catch { /* swallow on teardown */ } + } + } + /// /// Benchmarks MemoryPack via the IBufferWriter overload, allocating a FRESH ArrayBufferWriter on EVERY call. /// Apples-to-apples counterpart to AcBinaryFreshBufferWriterBenchmark. @@ -1800,6 +2091,20 @@ public static class Program var acBinaryDesResults2 = results.Where(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen) && r.DeserializeTimeMs > 0).ToList(); var acBinaryRtResults2 = results.Where(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen) && r.RoundTripTimeMs > 0).ToList(); + // Skip comparison block if either side has no Byte[] data — happens in AsyncPipe-only mode + // where only NamedPipe rows exist (no MemoryPack baseline, no AcBinary Byte[] reference). + // Mirrors the same early-return guard in PrintGroupedResults. + if (memPackRtResults2.Count == 0 || acBinaryRtResults2.Count == 0) + { + sb.AppendLine(" (Comparison requires both serialize and deserialize data)"); + File.WriteAllText(logFilePath, sb.ToString(), Utf8NoBom); + System.Console.WriteLine($"✓ Results saved to: {logFilePath}"); + + var llmFilePathEarly = Path.Combine(ResultsDirectory, $"{baseFileName}.LLM"); + SaveLlmResults(llmFilePathEarly, results, testDataSets); + return; + } + if (memPackSerResults2.Count > 0 && acBinarySerResults2.Count > 0) { var memPackAvgSer2 = memPackSerResults2.Average(r => r.SerializeTimeMs); diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md index 43f42e5..2f7a491 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md @@ -118,7 +118,31 @@ The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` **Cross-references:** - Wire format definition: `AsyncPipeWriterOutput.cs` (`ChunkDataMarker = 201`, `ChunkEndMarker = 202`) -- The `[200]` CHUNK_START marker is reserved/tolerated but currently has no semantics — could host a future stream-id extension if needed (out-of-scope today) +- The `[200] = 0xC8` CHUNK_START marker is currently reserved/tolerated with no semantics — the consensus mux-mode design (see `M2X7`) activates this code point as the mux-data marker (`[0xC8][streamId][UINT16 size][data]`), making each chunk self-identify its mode without needing a session handshake. Single-mode wire-format (`[0xC9]`) stays unchanged → SignalR protocol compat preserved. +- Symmetric reader-side limit: `ACCORE-BIN-I-C4N7` +- Tracked direction (additive wire-format extension on the reserved `[0xC8]` byte): [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction) + +### ACCORE-BIN-I-C4N7: AsyncPipeReaderInput single-consumer constraint + +**Status:** Open (intentional limit) +**Affects:** `AsyncPipeReaderInput` (consumer-side state machine) +**Reach:** any deployment that wants multiple consumers to read concurrently from the same long-lived input — e.g. parallel-dispatch RPC over a single NamedPipe where responses for in-flight calls return out of order. + +**Symptom:** Two threads concurrently calling `AcBinaryDeserializer.Deserialize(input, opts)` on the same `AsyncPipeReaderInput` race on the sliding-window buffer cursor (`_readPos` / `_writePos`), the framing-state machine, and the `Initialize` snapshot. Outcomes range from `AcBinaryDeserializationException` (mid-message state corruption) to silent graph corruption (buffer-position aliasing where consumer A reads bytes consumer B was meant to receive). + +**Root cause:** The reader's API contract assumes **one consumer thread** per input. `Initialize` / `TryAdvanceSegment` / `MessageDone` operate on shared mutable state without lock-free safety; the drain task (producer) is the only thread cleared to `Feed` concurrently with the consumer, and only via the buffer-handoff MRES protocol — not via a multi-reader-safe primitive. + +**Why intentional:** Adding consumer-side concurrency primitives inside the reader would impose per-call thread-safety overhead (locks or atomic state transitions) on the 99% single-consumer use case. The wire format today has no stream-id field on `[0xC9]` chunks (see `ACCORE-BIN-I-M9P3` — symmetric writer-side limit), so demultiplexing requires either a coordinator above the existing class **or** an opt-in mux-mode marker (the `M2X7` direction activates `[0xC8]` for exactly this purpose). + +**Workarounds:** +- **External serialization at consumer**: `lock` / `SemaphoreSlim` around `Deserialize` calls — works but kills the parallelism benefit +- **One AsyncPipeReaderInput per logical stream**: each stream gets a separate input + drain task pair — costs one ArrayPool buffer + one MRES + one Task per stream, but isolates state cleanly +- **Marker-distinguished mux-mode** (consensus future direction): the producer emits `[0xC8][streamId][UINT16 size][data]` for mux chunks; the reader's framing-state-machine reads the streamId byte and dispatches each chunk to the matching per-stream child input. Single-stream wire-format (`[0xC9]`) stays unchanged. See [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction). + +**Cross-references:** +- `ACCORE-BIN-I-M9P3` — symmetric writer-side limit (concurrent writers interleave on the wire) +- `ACCORE-BIN-I-Q4T8` — distinct concern: multi-message reuse on a single input, not multi-consumer concurrency +- Tracked direction: [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction) ### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md index df9d712..761426f 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_TODO.md @@ -138,3 +138,58 @@ Extend the `AsyncPipeReaderInput` class summary with an explicit pattern catalog **Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2-chunk-on-wire-size--chunksize--headersize-caused-page-fragmentation) A documentation section covering the alignment of the user-space `chunkSize` and the transport-level kernel buffer (general principle, not transport-specific): page-aligned matching avoids fragmentation, mismatched sizes cause two kernel-page transfers per chunk. Reference benchmark archive(s) where empirical numbers exist. + +## ACCORE-BIN-T-M2X7: Multi-stream multiplexer support (marker-byte mode distinction) +**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-m9p3`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-m9p3-no-native-chunk-multiplexing-for-concurrent-writers), [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-c4n7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-c4n7-asyncpipereaderinput-single-consumer-constraint) + +Enable N parallel logical streams over a single long-lived `Stream` (NamedPipe / NetworkStream / FileStream). Use cases: distributed logging fan-in (many threads → one pipe), telemetry sinks (many event sources → one collector), parallel-dispatch RPC over raw NamedPipe (multiple in-flight calls). NOT needed for SignalR — Kestrel multiplexes at the WebSocket / HTTP-2 layer above and the `AcBinaryHubProtocol` already receives a sequential byte stream. + +**Wire-format design (consensus direction):** + +Each chunk's first byte (the marker) self-identifies the mode — no session-handshake byte, no out-of-band config, no silent-corruption mismatch risk: + +``` +Single-mode (UNCHANGED — wire-format-compat with current SignalR protocol): + [0xC9 = 201][UINT16 size][data] ← chunk-data marker (current AsyncPipeWriterOutput.ChunkDataMarker) + [0xCA = 202] ← chunk-end marker + +Mux-mode (NEW — activates the currently-reserved [0xC8 = 200] byte): + [0xC8 = 200][streamId][UINT16 size][data] ← mux-chunk-data marker + 1-byte streamId + [end-marker — design TBD, see "Open design questions" below] +``` + +The reader switches on the first byte: `0xC9` → single-mode parse, `0xC8` → mux-mode parse (read streamId, dispatch to per-stream child input). Mixed-mode chunks on the same stream are wire-format-legal (each chunk self-identifies) — useful for migration scenarios, not for typical use. + +**Why this design over alternatives** (decisions captured for future maintainers): + +- **Why not session-handshake byte at start of stream?** Self-describing chunks survive reconnect / partial-truncation / sender-restart without losing mode-context. The handshake-byte approach has a silent-corruption risk if a receiver misses the handshake (e.g. attaches mid-stream). +- **Why not bit-pack streamId+type+length into one byte?** Length-bit-packing (e.g. "full-chunk flag" omitting the size field) saves ~0.012% wire bytes on large messages but couples producer/receiver chunkSize, breaks debug-readability, and constrains future protocol extensions. ROI doesn't pay back. Length stays `[UINT16 size]` per chunk — robust, debuggable, future-proof. +- **Why `[0xC8]` for mux?** Already documented as reserved/tolerated for future stream-id extension (see `ACCORE-BIN-I-M9P3` cross-reference). Activating it is the natural use of the reserved code-point — no new wire-byte allocation needed. + +**Implementation approach (consensus direction):** + +Unified class with mode flag — the standard and mux modes share **one code path** distinguished by a `bool _isMux` (or equivalent stream-id container). A handful of branches at the marker-write / marker-parse points cover the difference (~1-2 ns per chunk, negligible vs the ~50–200 µs RT cost). This is preferred over a strategy-generic refactor (`AsyncPipeWriterCore`) because the few extra ns per chunk are invisible in production workloads, while the strategy-generic adds compile-time complexity (struct-strategy boilerplate, generic-constraint gymnastics) without commensurate runtime benefit at the typical 1k–1M ops/sec working ranges. + +Per side: + +- **Writer side**: `AsyncPipeWriterOutput` (struct) gains an opt-in `streamId` ctor parameter (default = "single-mode, no streamId byte emitted"). When set, the chunk-write loop emits `[0xC8][streamId]` instead of `[0xC9]`. The `IBinaryOutputBase` API surface is unchanged — `AcBinarySerializer` doesn't see any difference. JIT-elimination of the mode-branch is unnecessary because the cost is already negligible. + +- **Reader side**: `AsyncPipeReaderInput` (sealed class) gains a multi-stream coordinator overload. Two possible shapes (pick at impl-time): + - **Inline mode flag**: a `bool _muxMode` ctor flag controls whether the framing-state-machine reads a streamId byte after `[0xC8]`. Single-stream consumer uses one input, multi-stream consumer uses N child inputs accessed via `GetStream(streamId)` on a separate coordinator class. + - **Coordinator + child instances**: a new `AsyncPipeMuxReaderInput` (sealed class) holds N child `AsyncPipeReaderInput` instances + 1 demux task; the existing core class is untouched. Cleaner separation, more allocations. + +**Open design questions** (decide before implementing): + +- **End-marker shape in mux-mode**: three candidates, pick one: + - (a) New marker byte `[0xCB][streamId]` for mux-end (uses next reserved code point) + - (b) Reuse `[0xCA]` with stream-id suffix in mux context: `[0xCA][streamId]` (parser knows from prior chunks that current frame is mux-mode) + - (c) Per-chunk "is-final" flag in the size field high bit (eliminates explicit end marker — caps chunk size at 32 KB, currently 16 KB hardlimit so non-shrinking) +- **Stream-id space sizing**: 1-byte `[0..255]` covers most use cases; 2-byte if N > 256 streams expected. Affects per-chunk overhead by 1 vs 2 bytes. The 1-byte choice has a forward-compat path via reserving `0xFF` as "extended id follows". +- **Dispose / lifecycle propagation**: the mux-coordinator's `Dispose()` must tear down all child instances cleanly. Mirror of `ACCORE-BIN-I-W6K4` (disposal trap on the core class) at the coordinator level. +- **Reader-side asymmetry**: the writer is a struct (hot-path generic constraint), the reader is a sealed class. The mode-flag approach is symmetric for the writer (1 byte stream-id field next to existing fields) but on the reader the multi-stream case fundamentally needs N child instances + demux — composition wins there regardless of direction. Decide whether to use one unified class on both sides (writer easy, reader still needs a coordinator) or unified class on writer side + separate coordinator class on reader side. + +**Acceptance:** +- Single-mode (standard) wire format byte-equivalent to today's output for any payload (regression test guards against accidental wire changes; SignalR protocol compat preserved). +- Concurrent multi-producer test: N producer threads simultaneously serialise into N stream-id-distinct writers; receiver demultiplexes and verifies each stream's graph independently. No interleaving corruption. +- Microbenchmark: single-stream sequential RT regresses by ≤ ~5 ns/op vs current (a few branch-predicted `if` per chunk; not strategy-generic-grade zero-overhead but production-invisible). +- Wire-format documentation updated in [`BINARY_FORMAT.md`](BINARY_FORMAT.md) with the `[0xC8]` mux-data activation and the chosen end-marker shape. diff --git a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md index b382700..53bdc11 100644 --- a/AyCode.Core/docs/BINARY/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY/BINARY_WRITERS.md @@ -120,6 +120,25 @@ Constructor parameter `waitForFlush` (default `true`): In both modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. +### Two parallel-flush regimes (auto-detected) + +Runtime check `pipeWriter.GetType()` splits flush behavior into two regimes — auto-detected at ctor via `_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter)`. No caller intervention. Orthogonal to `waitForFlush` and to the wire-format mode choice (`Bytes` / `Segment` / `AsyncSegment`). + +**True parallel** — Pipe-based / parallel-capable PipeWriters: `new Pipe().Writer`, Kestrel transport output, custom parallel-capable impls. `Grow()` uses `FlushAsync().Forget()` pattern: serializer continues with the next chunk while the network async-flushes the previous one. Round-trip wall-clock = `max(serialize, flush) × N_chunks` — flush hides behind serialize-time. Production-stable on SignalR / Kestrel; "minimally slower than raw byte[]" empirically. + +**Half parallel** — `StreamPipeWriter`-backed transports: `PipeWriter.Create(stream)` for NamedPipe / FileStream / NetworkStream / MemoryStream / SslStream / etc. The BCL `StreamPipeWriter._tailMemory = default` reset on flush completion races against the parallel-acquire pattern, forcing `FlushAsync().GetAwaiter().GetResult()` after every commit. Kernel-IO is strictly sequential. Managed-side parallelism (drain-task, deser-task, calling-thread) still possible, but wall-clock = `(serialize + flush) × N_chunks` — flush accumulates per chunk. + +| Regime | Detected on | Flush pattern in `Grow()` | Wall-clock formula | vs raw `byte[]` (NamedPipe, 1000 iter / 5000 warmup) | +|---|---|---|---|---| +| True parallel | non-StreamPipeWriter | `FlushAsync().Forget()` then continue | `max(ser, flush) × N` | minimal — flush hides behind serialize | +| Half parallel | `StreamPipeWriter` (any stream) | `SyncAwaitFlush(FlushAsync())` | `(ser + flush) × N` | Small 2KB: -2% / Med 7.5KB: +15% / Large 50KB: +34% / Repeated 10KB: +3% / Deep 11KB: +10% | + +**Allocation savings** (chunked vs. raw single-shot `byte[]`) are **regime-independent and payload-size-independent**: ~30% fewer allocated KB per round-trip, because `MemoryMarshal.TryGetArray` direct-buffer-write into the PipeWriter's internal slab eliminates the intermediate pooled-`byte[]` rent that the raw path pays. + +The half-parallel wall-clock cost is **bounded by the BCL `StreamPipeWriter` design**, not by `AsyncPipeWriterOutput` — the runtime detect is the optimal managed-layer response. Multiple sequential `FlushAsync` round-trips on the kernel-IO are the dominant cost on Stream-backed transports; payload-size-arányos as the chunk count grows. JIT tier-1 warmup matters: ~5000 warmup iterations needed to stabilize the async-state-machine code (vs ~500 for the raw path) — see `BINARY_TODO.md#accore-bin-t-t5j8`. + +**Real-world latency-budget context.** The +34% slowdown on Large-payload NamedPipe is +66 µs absolute. Typical IPC / HTTP / SignalR / frame-budget round-trips are 1-50 ms, where the absolute differential is 0.13-5% — within the noise floor. Choose chunked for: GC-pressure-sensitive hot-paths (~30% alloc savings, 0 GC-pressure for the serialize), memory-bounded hosts (chunk-bounded peak vs. payload-bounded), GB-scale payloads (Bytes mode OOMs), and parallel-capable transports (true-parallel pipeline overlap). Avoid chunked when: the workload is wall-clock-throughput-bound on a Stream-backed transport AND payload is Medium-to-Large AND GC-pressure is tolerable. + ### Wire Format (per chunk) ```