[LOADED_DOCS: 3 files, no new loads]

Add raw NamedPipe benchmark & mux-mode AsyncPipe docs

- Add AcBinaryNamedPipeRawByteArrayBenchmark for raw NamedPipe (no chunking) to isolate kernel vs. AsyncPipe overhead
- Refactor progress reporting with in-place updates for all timed/allocation benchmarks
- Document [0xC8] marker as mux-mode direction; add ACCORE-BIN-T-M2X7 and ACCORE-BIN-I-C4N7 for multi-stream and single-consumer constraints
- Expand BINARY_WRITERS.md with parallel-flush regime analysis and allocation context
- Improve result comparison robustness for AsyncPipe-only runs
- Minor doc clarifications and explicit AsyncPipeReaderInput usage patterns
This commit is contained in:
Loretta 2026-05-02 00:03:22 +02:00
parent 329c9c2928
commit a537f18294
4 changed files with 430 additions and 27 deletions

View File

@ -37,6 +37,16 @@ public static class Program
private const string BuildConfiguration = "Release";
#endif
#if DEBUG
private static int WarmupIterations = 0;
private static int TestIterations = 1;
private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
private static int WarmupIterations = 5000; //5000
private static int TestIterations = 1000; //1000
private static int BenchmarkSamples = 3;
#endif
// Serializer name constants
// Engine identifiers (used in Engine column + comparison logic)
private const string EngineAcBinary = "AcBinary";
@ -50,6 +60,7 @@ public static class Program
private const string IoBufWrNew = "BufWr new";
private const string IoString = "String";
private const string IoNamedPipe = "NamedPipe";
private const string IoNamedPipeRaw = "NamedPipe";
// Dispatch mode identifiers — describes how property access / type dispatch happens for a given run.
// SGen = compile-time source generator path (Unsafe.As<T> direct fields, slot-array wrapper lookup).
@ -132,16 +143,6 @@ public static class Program
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static double ToKilobytes(long bytes) => bytes / 1024.0;
#if DEBUG
private static int WarmupIterations = 0;
private static int TestIterations = 1;
private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
private static int WarmupIterations = 5000; //5000
private static int TestIterations = 1000; //1000
private static int BenchmarkSamples = 3;
#endif
public static void Main(string[] args)
{
// Set console encoding to UTF-8 for proper Unicode character display
@ -385,6 +386,10 @@ public static class Program
IsRoundTripOnly = serializer.IsRoundTripOnly
};
// Group label for in-place \r progress. Identifies (cell × serializer) so a stuck benchmark
// is visibly stuck on a specific row at a specific %% rather than silently hanging.
var groupLabel = $"{result.SerializerName}";
if (serializer.IsRoundTripOnly)
{
// Round-trip-only benchmarks (NamedPipe etc.): measure the full pipe round-trip directly into the RT
@ -393,8 +398,8 @@ public static class Program
// also show up — otherwise current-thread alloc would only count the client side and look ~halved.
if (mode is "all" or "serialize" or "ser")
{
result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations);
result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations);
result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations, $"{groupLabel} [RT timing]");
result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations, $"{groupLabel} [RT alloc]");
}
// mode == "deserialize" alone is meaningless for a round-trip-only benchmark; skip silently.
}
@ -402,15 +407,15 @@ public static class Program
{
if (mode is "all" or "serialize" or "ser")
{
result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations);
result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations, $"{groupLabel} [Ser timing]");
// Dedicated alloc-only sample (separate from timing samples; keeps timing pure)
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations);
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations, $"{groupLabel} [Ser alloc]");
}
if (mode is "all" or "deserialize" or "des")
{
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations);
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations);
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations, $"{groupLabel} [Des timing]");
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations, $"{groupLabel} [Des alloc]");
}
// Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable).
@ -448,7 +453,16 @@ public static class Program
return new List<ISerializerBenchmark>
{
// Chunked-framed AsyncPipe: SerializeChunkedFramed + AsyncPipeReaderInput.DrainFromAsync.
// Measures the FULL streaming-I/O stack — wire framing + drain task + sliding-window buffer +
// MRES wait-on-byte-shortage — over a kernel NamedPipe.
new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeChunk)"),
// Raw byte[] over NamedPipe (sync receive, no chunk-framing). Same kernel-pipe transport,
// same inBufferSize, but: serialize → byte[] → Stream.Write → Stream.Read → Deserialize<T>(byte[]).
// No drain task, no AsyncPipeReaderInput, no [201][UINT16][data]…[202] framing. Side-by-side with
// the chunked-row above this isolates AsyncPipe-framework-overhead (Δ vs raw) from
// kernel-transport-overhead (raw vs in-process Byte[]).
new AcBinaryNamedPipeRawByteArrayBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeRaw)"),
};
}
@ -528,17 +542,20 @@ public static class Program
/// returning the median elapsed time. Multi-sample design reduces single-run variance from ~±15% to ~±5%
/// by smoothing transient effects (background activity, thermal/turbo state, JIT tier-promotion timing).
/// When <see cref="BenchmarkSamples"/> &lt;= 1, falls back to single-sample timing (Debug / quick mode).
/// When <paramref name="progressLabel"/> is non-null, emits in-place <c>\r</c> progress updates so a
/// stuck benchmark (e.g. deadlocked NamedPipe row) is visibly stuck at a specific %% rather than
/// silently hanging.
/// </summary>
private static double RunTimed(Action action, int iterations)
private static double RunTimed(Action action, int iterations, string? progressLabel = null)
{
var samples = BenchmarkSamples;
if (samples <= 1)
{
// Single-sample fast path (Debug or trivial run) — no allocation, no sort.
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0);
sw.Stop();
EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds);
return sw.Elapsed.TotalMilliseconds;
}
@ -546,30 +563,34 @@ public static class Program
for (var s = 0; s < samples; s++)
{
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
RunWithProgress(action, iterations, progressLabel, samples, s);
sw.Stop();
times[s] = sw.Elapsed.TotalMilliseconds;
}
Array.Sort(times);
// Median: middle value for odd sample counts, average of two middles for even counts.
return samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
var medianMs = samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
EndProgress(progressLabel, medianMs);
return medianMs;
}
/// <summary>
/// Measures per-call allocation in bytes after a clean GC. Single dedicated sample (no median) — keeps timing samples pure.
/// </summary>
private static long MeasureAllocation(Action action, int iterations)
private static long MeasureAllocation(Action action, int iterations, string? progressLabel = null)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var sw = Stopwatch.StartNew();
var before = GC.GetAllocatedBytesForCurrentThread();
for (var i = 0; i < iterations; i++) action();
RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0);
var after = GC.GetAllocatedBytesForCurrentThread();
sw.Stop();
EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds);
return (after - before) / iterations;
}
@ -581,19 +602,82 @@ public static class Program
/// Slightly noisier than the per-thread variant (background threads / GC bookkeeping leak in), but
/// over 1000 iterations the signal dominates.
/// </summary>
private static long MeasureAllocationTotal(Action action, int iterations)
private static long MeasureAllocationTotal(Action action, int iterations, string? progressLabel = null)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var sw = Stopwatch.StartNew();
var before = GC.GetTotalAllocatedBytes(precise: true);
for (var i = 0; i < iterations; i++) action();
RunWithProgress(action, iterations, progressLabel, samples: 1, sampleIndex: 0);
var after = GC.GetTotalAllocatedBytes(precise: true);
sw.Stop();
EndProgress(progressLabel, sw.Elapsed.TotalMilliseconds);
return (after - before) / iterations;
}
// ============================================================================================
// Progress reporting — \r-driven in-place updates so a stuck benchmark surfaces the exact phase
// and % where it stopped, instead of appearing as a silent hang. Used by RunTimed and the
// MeasureAllocation* helpers when the caller passes a non-null progressLabel.
// ============================================================================================
// Tracks the longest line written by the current progress session, so EndProgress can clear
// any leftover characters from a prior longer line (avoids "ghost" trailing chars after \r).
private static int _progressLastLineLen;
/// <summary>
/// Runs <paramref name="action"/> <paramref name="iterations"/> times, emitting \r-overwriting
/// progress every ~10% (approx. 10 progress prints per sample). When <paramref name="label"/>
/// is null, runs without any progress output (zero overhead beyond a null check per iter).
/// </summary>
private static void RunWithProgress(Action action, int iterations, string? label, int samples, int sampleIndex)
{
if (label is null)
{
for (var i = 0; i < iterations; i++) action();
return;
}
// ~10 progress emits per sample run. Avoid emitting on every iter (Console.Write is
// expensive enough to skew sub-µs benchmarks if overdone).
var step = Math.Max(1, iterations / 10);
for (var i = 0; i < iterations; i++)
{
action();
if ((i + 1) % step == 0 || i == iterations - 1)
{
var pct = (int)((i + 1) * 100L / iterations);
var line = samples > 1
? $" > {label} sample {sampleIndex + 1}/{samples} {pct,3}% ({i + 1}/{iterations})"
: $" > {label} {pct,3}% ({i + 1}/{iterations})";
System.Console.Write('\r');
System.Console.Write(line);
if (line.Length < _progressLastLineLen)
System.Console.Write(new string(' ', _progressLastLineLen - line.Length));
_progressLastLineLen = line.Length;
}
}
}
/// <summary>
/// Closes a progress line cleanly: clears any leftover chars and writes a final "done" line on
/// the same row, terminated by \n so subsequent <c>WriteLine</c> calls render below.
/// </summary>
private static void EndProgress(string? label, double elapsedMs)
{
if (label is null) return;
var done = $" > {label} done in {elapsedMs,7:F1} ms";
System.Console.Write('\r');
System.Console.Write(done);
if (done.Length < _progressLastLineLen)
System.Console.Write(new string(' ', _progressLastLineLen - done.Length));
System.Console.WriteLine();
_progressLastLineLen = 0;
}
private static readonly JsonSerializerOptions VerifyJsonOpts = new()
{
WriteIndented = false,
@ -1121,6 +1205,213 @@ public static class Program
}
}
/// <summary>
/// Raw <c>byte[]</c> over a long-lived NamedPipe — NO chunk-framing, NO <c>AsyncPipeReaderInput</c>,
/// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background read-thread
/// drains the pipe sync; calling thread deserialises. The background-read mirrors
/// <see cref="AcBinaryNamedPipeBenchmark"/>'s drain task — needed to avoid a kernel-buffer-full deadlock
/// when <c>bytes.Length &gt; inBufferSize</c> (Large/Repeated/Deep payloads on a 16 KB pipe-buffer).
///
/// Side-by-side with <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed AsyncPipe stack) this
/// isolates two cost components on the SAME kernel-pipe transport with the SAME <c>inBufferSize</c>:
/// <list type="bullet">
/// <item><description><b>This row vs <see cref="AcBinaryBenchmark"/> (Byte[])</b> — pure kernel-NamedPipe
/// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff).</description></item>
/// <item><description><b>This row vs <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed)</b> — pure
/// AsyncPipe-framework overhead (chunk header writes + sliding-window <c>Feed</c> + MRES wait inside
/// <c>AsyncPipeReaderInput</c>). Both rows have a long-lived read-thread; only the framing differs.</description></item>
/// </list>
/// Per-iter <c>byte[]</c> allocation from <c>AcBinarySerializer.Serialize</c> is part of the cost (matches
/// <see cref="AcBinaryBenchmark"/>'s API contract); the receive-side scratch buffer is also allocated per-iter
/// on the read-thread (counted via <c>GC.GetTotalAllocatedBytes</c> in <c>MeasureAllocationTotal</c>).
/// </summary>
private sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable
{
private readonly TestOrder _order;
private readonly AcBinarySerializerOptions _options;
private readonly byte[] _serialized; // for SerializedSize reporting + receive-side size known upfront
// Long-lived pipe lifecycle (set up once in ctor — NOT timed).
private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient;
// Long-lived read-thread infrastructure — mirrors AcBinaryNamedPipeBenchmark's _drainTask. Needed
// to prevent kernel-buffer-full deadlock: when bytes.Length > inBufferSize, _pipeClient.Write
// blocks until the kernel buffer drains, but the drain only happens when SOMETHING reads from the
// server end. Single-thread Write→Read sequencing → deadlock. Background-read overlaps the read.
private readonly CancellationTokenSource _cts;
private readonly Task _readTask;
private readonly ManualResetEventSlim _readRequest = new(false);
private readonly ManualResetEventSlim _readDone = new(false);
private int _pendingReadSize;
private byte[]? _receivedSlot;
private bool _disposed;
public string Engine => EngineAcBinary;
public string IoMode => IoNamedPipeRaw;
public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime;
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupSerializeAllocBytes { get; }
public long SetupDeserializeAllocBytes { get; }
public bool IsRoundTripOnly => true;
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,bg-read)");
public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
// BufferWriterChunkSize comes from the caller — same source-of-truth contract as
// AcBinaryNamedPipeBenchmark. The kernel pipe-buffer (inBufferSize) is wired to it so the
// raw-vs-chunked comparison runs on identical transport conditions.
_options = options;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
var pipeName = $"AcBinaryBenchRaw-{Guid.NewGuid():N}";
// === SERIALIZE-side setup measurement ===
// pipe-pair (server + client) + connect handshake. NO PipeWriter wrapper — we use the raw
// Stream.Write API directly, matching the no-framing semantics of this benchmark.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeSer = GC.GetAllocatedBytesForCurrentThread();
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous,
inBufferSize: _options.BufferWriterChunkSize,
outBufferSize: _options.BufferWriterChunkSize);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync();
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
var afterSer = GC.GetAllocatedBytesForCurrentThread();
SetupSerializeAllocBytes = afterSer - beforeSer;
// === DESERIALIZE-side setup measurement ===
// 1× background read-thread + 2× MRES (request / done) + cancellation source. Matches the
// chunked benchmark's deserialize-side setup cost shape (it has 1× drain Task + AsyncPipeReaderInput
// with 1× MRES + ArrayPool rent).
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeDes = GC.GetAllocatedBytesForCurrentThread();
_cts = new CancellationTokenSource();
_readTask = Task.Run(ReadLoop);
var afterDes = GC.GetAllocatedBytesForCurrentThread();
SetupDeserializeAllocBytes = afterDes - beforeDes;
}
// Long-lived read-loop on a background thread. Pattern: wait for request → drain N bytes → publish
// result via _receivedSlot → signal done. The calling thread provides the size via _pendingReadSize
// BEFORE setting _readRequest, so the read-thread always knows how much to read.
private void ReadLoop()
{
var ct = _cts.Token;
try
{
while (!ct.IsCancellationRequested)
{
_readRequest.Wait(ct);
if (ct.IsCancellationRequested) break;
_readRequest.Reset();
var size = _pendingReadSize;
var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal
var totalRead = 0;
while (totalRead < size)
{
var n = _pipeServer.Read(bytes, totalRead, size - totalRead);
if (n == 0) break; // pipe closed / EOF — partial read returned to caller
totalRead += n;
}
_receivedSlot = bytes;
_readDone.Set();
}
}
catch (OperationCanceledException)
{
// Cooperative cancel — Dispose path. Swallow.
}
catch
{
// Any other error during teardown → swallow; the calling thread's _readDone.Wait()
// would then time out, surfaced by the dispose timeout below.
}
}
public void Warmup(int iterations)
{
for (var i = 0; i < iterations; i++)
{
Serialize();
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
// Sender: serialize → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark API contract).
var bytes = AcBinarySerializer.Serialize(_order, _options);
// Hand off the expected size to the read-thread BEFORE signalling — read-thread reads
// _pendingReadSize after _readRequest.Wait returns, so write-then-set ordering is sufficient
// (MRES.Set has release semantics; MRES.Wait has acquire).
_pendingReadSize = bytes.Length;
_readDone.Reset();
_readRequest.Set();
// Sync write on calling thread, OVERLAPPING with the read-thread's Read loop. The kernel
// buffer may fill (bytes.Length > inBufferSize) — Write blocks; the read-thread drains;
// Write resumes. Total wall time ≈ Write-bound or Read-bound, whichever is slower.
_pipeClient.Write(bytes, 0, bytes.Length);
_pipeClient.Flush();
// Wait for the read-thread to finish accumulating the message.
_readDone.Wait();
var receivedBytes = _receivedSlot!;
_ = AcBinaryDeserializer.Deserialize<TestOrder>(receivedBytes, _options);
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Deserialize()
{
// No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract.
}
public bool VerifyRoundTrip()
{
// Inlined version of Serialize() that captures the deserialised graph (Serialize()'s
// discard-pattern is correct for the timed loop but useless for verification).
var bytes = AcBinarySerializer.Serialize(_order, _options);
_pendingReadSize = bytes.Length;
_readDone.Reset();
_readRequest.Set();
_pipeClient.Write(bytes, 0, bytes.Length);
_pipeClient.Flush();
_readDone.Wait();
var received = _receivedSlot!;
var result = AcBinaryDeserializer.Deserialize<TestOrder>(received, _options);
return result != null && DeepEqualsViaJson(_order, result);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Cancel the read-loop → ReadLoop exits its Wait via OperationCanceledException.
try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _readRequest.Set(); } catch { /* nudge in case Wait is parked */ }
try { _readTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Symmetric teardown — close client first (writer side), then server.
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _readRequest.Dispose(); } catch { /* swallow on teardown */ }
try { _readDone.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ }
}
}
/// <summary>
/// Benchmarks MemoryPack via the IBufferWriter overload, allocating a FRESH ArrayBufferWriter on EVERY call.
/// Apples-to-apples counterpart to AcBinaryFreshBufferWriterBenchmark.
@ -1800,6 +2091,20 @@ public static class Program
var acBinaryDesResults2 = results.Where(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen) && r.DeserializeTimeMs > 0).ToList();
var acBinaryRtResults2 = results.Where(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen) && r.RoundTripTimeMs > 0).ToList();
// Skip comparison block if either side has no Byte[] data — happens in AsyncPipe-only mode
// where only NamedPipe rows exist (no MemoryPack baseline, no AcBinary Byte[] reference).
// Mirrors the same early-return guard in PrintGroupedResults.
if (memPackRtResults2.Count == 0 || acBinaryRtResults2.Count == 0)
{
sb.AppendLine(" (Comparison requires both serialize and deserialize data)");
File.WriteAllText(logFilePath, sb.ToString(), Utf8NoBom);
System.Console.WriteLine($"✓ Results saved to: {logFilePath}");
var llmFilePathEarly = Path.Combine(ResultsDirectory, $"{baseFileName}.LLM");
SaveLlmResults(llmFilePathEarly, results, testDataSets);
return;
}
if (memPackSerResults2.Count > 0 && acBinarySerResults2.Count > 0)
{
var memPackAvgSer2 = memPackSerResults2.Average(r => r.SerializeTimeMs);

View File

@ -118,7 +118,31 @@ The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput`
**Cross-references:**
- Wire format definition: `AsyncPipeWriterOutput.cs` (`ChunkDataMarker = 201`, `ChunkEndMarker = 202`)
- The `[200]` CHUNK_START marker is reserved/tolerated but currently has no semantics — could host a future stream-id extension if needed (out-of-scope today)
- The `[200] = 0xC8` CHUNK_START marker is currently reserved/tolerated with no semantics — the consensus mux-mode design (see `M2X7`) activates this code point as the mux-data marker (`[0xC8][streamId][UINT16 size][data]`), making each chunk self-identify its mode without needing a session handshake. Single-mode wire-format (`[0xC9]`) stays unchanged → SignalR protocol compat preserved.
- Symmetric reader-side limit: `ACCORE-BIN-I-C4N7`
- Tracked direction (additive wire-format extension on the reserved `[0xC8]` byte): [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction)
### ACCORE-BIN-I-C4N7: AsyncPipeReaderInput single-consumer constraint
**Status:** Open (intentional limit)
**Affects:** `AsyncPipeReaderInput` (consumer-side state machine)
**Reach:** any deployment that wants multiple consumers to read concurrently from the same long-lived input — e.g. parallel-dispatch RPC over a single NamedPipe where responses for in-flight calls return out of order.
**Symptom:** Two threads concurrently calling `AcBinaryDeserializer.Deserialize<T>(input, opts)` on the same `AsyncPipeReaderInput` race on the sliding-window buffer cursor (`_readPos` / `_writePos`), the framing-state machine, and the `Initialize` snapshot. Outcomes range from `AcBinaryDeserializationException` (mid-message state corruption) to silent graph corruption (buffer-position aliasing where consumer A reads bytes consumer B was meant to receive).
**Root cause:** The reader's API contract assumes **one consumer thread** per input. `Initialize` / `TryAdvanceSegment` / `MessageDone` operate on shared mutable state without lock-free safety; the drain task (producer) is the only thread cleared to `Feed` concurrently with the consumer, and only via the buffer-handoff MRES protocol — not via a multi-reader-safe primitive.
**Why intentional:** Adding consumer-side concurrency primitives inside the reader would impose per-call thread-safety overhead (locks or atomic state transitions) on the 99% single-consumer use case. The wire format today has no stream-id field on `[0xC9]` chunks (see `ACCORE-BIN-I-M9P3` — symmetric writer-side limit), so demultiplexing requires either a coordinator above the existing class **or** an opt-in mux-mode marker (the `M2X7` direction activates `[0xC8]` for exactly this purpose).
**Workarounds:**
- **External serialization at consumer**: `lock` / `SemaphoreSlim` around `Deserialize<T>` calls — works but kills the parallelism benefit
- **One AsyncPipeReaderInput per logical stream**: each stream gets a separate input + drain task pair — costs one ArrayPool buffer + one MRES + one Task per stream, but isolates state cleanly
- **Marker-distinguished mux-mode** (consensus future direction): the producer emits `[0xC8][streamId][UINT16 size][data]` for mux chunks; the reader's framing-state-machine reads the streamId byte and dispatches each chunk to the matching per-stream child input. Single-stream wire-format (`[0xC9]`) stays unchanged. See [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction).
**Cross-references:**
- `ACCORE-BIN-I-M9P3` — symmetric writer-side limit (concurrent writers interleave on the wire)
- `ACCORE-BIN-I-Q4T8` — distinct concern: multi-message reuse on a single input, not multi-consumer concurrency
- Tracked direction: [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m2x7-multi-stream-multiplexer-support-marker-byte-mode-distinction)
### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long