From 05f90a56396beae3d9b1d6684ccd172de3d1517d Mon Sep 17 00:00:00 2001 From: Loretta Date: Sat, 2 May 2026 11:55:46 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor pipe benchmarks to 2-task streaming model Refactored AcBinaryNamedPipeBenchmark and AcBinaryNamedPipeRawByteArrayBenchmark to use a two-task (producer/consumer) streaming pipeline for deserialization, enabling true Ser↔Des overlap. Reduced BufferWriterChunkSize from 16K to 4K. Updated synchronization, cleanup, and comments to reflect the new architecture and improve performance comparison between chunked and raw byte[] modes. --- AyCode.Core.Serializers.Console/Program.cs | 257 +++++++++++++-------- 1 file changed, 162 insertions(+), 95 deletions(-) diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index 4a9a7fb..185851e 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -449,7 +449,7 @@ public static class Program // fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level // wire chunk AND kernel transfer unit; change ONLY this line when tuning. var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode; - binaryFastModePipeChunkOnly.BufferWriterChunkSize = 16_384; //AsyncPipeWriterOutput.MaxChunkSize; + binaryFastModePipeChunkOnly.BufferWriterChunkSize = 4096; //AsyncPipeWriterOutput.MaxChunkSize; return new List { @@ -1081,7 +1081,12 @@ public static class Program // Long-lived multi-message receive infrastructure (set up once in ctor). private readonly AsyncPipeReaderInput _input; private readonly CancellationTokenSource _cts; - private readonly Task _drainTask; + private readonly Task _drainTask; // BG: PipeReader → input.Feed (continuous pump) + private readonly Task _consumerTask; // BG: per-iter Deserialize(input) loop, signaled by calling thread + private readonly ManualResetEventSlim _consumeRequest = new(false); + private readonly ManualResetEventSlim _consumeDone = new(false); + private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters + private bool _captureResult; // toggle: when true, ConsumeLoop stores result; otherwise discards private bool _disposed; public string Engine => EngineAcBinary; @@ -1092,7 +1097,7 @@ public static class Program public long SetupSerializeAllocBytes { get; } public long SetupDeserializeAllocBytes { get; } public bool IsRoundTripOnly => true; - public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)"); + public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage,2-task)"); public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) { @@ -1131,24 +1136,62 @@ public static class Program // === DESERIALIZE-side setup measurement === // PipeReader wrapper + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain - // task scaffolding. The long-lived deserialize-side allocation that per-iter measurements - // hide today, surfaced here for comparison-vs-FreshInstance fairness. + // task + consumer task scaffolding. Two long-lived BG tasks total: drain pumps bytes from the + // kernel pipe into input; consumer drives Deserialize(input) per iter on signal. GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var beforeDes = GC.GetAllocatedBytesForCurrentThread(); _pipeReader = PipeReader.Create(_pipeServer); - // 1× multi-message receive infrastructure: long-lived input + 1 background drain task. - // Per-iter Serialize() does its own Deserialize(input, opts) call on the calling thread — - // strictly sequential per the calling thread's loop, so the producer (drain) and consumer - // (deserialiser, on the calling thread) cannot race on the buffer. _input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true); _cts = new CancellationTokenSource(); // Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for - // the full benchmark lifetime (NOT per iteration) — its overhead is amortised across all messages. + // the full benchmark lifetime — its overhead is amortised across all messages. _drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token)); + // Consumer task: per-iter Deserialize(input) loop. Started here once; signaled per-iter via + // _consumeRequest. Enables Ser↔Des streaming overlap — calling thread runs SerializeChunkedFramed + // while THIS task simultaneously runs Deserialize, both consuming/producing through the + // sliding-window buffer pipelined by the drain task. + _consumerTask = Task.Run(ConsumeLoop); var afterDes = GC.GetAllocatedBytesForCurrentThread(); SetupDeserializeAllocBytes = afterDes - beforeDes; } + // BG consumer: parks on _consumeRequest, runs Deserialize(_input) when signaled, signals _consumeDone. + // The Deserialize call internally blocks on the input's MRES whenever the drain hasn't yet fed enough + // bytes for the next read — that's where the streaming-pipeline overlap with the calling thread (Ser) + // happens. + private void ConsumeLoop() + { + var ct = _cts.Token; + try + { + while (true) + { + _consumeRequest.Wait(ct); + if (ct.IsCancellationRequested) return; + _consumeRequest.Reset(); + + try + { + var result = AcBinaryDeserializer.Deserialize(_input, _options); + if (_captureResult) _lastResult = result; + } + catch + { + // Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip, + // or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose. + } + finally + { + _consumeDone.Set(); + } + } + } + catch (OperationCanceledException) + { + // Cooperative cancel — Dispose path. Swallow. + } + } + public void Warmup(int iterations) { for (var i = 0; i < iterations; i++) @@ -1160,16 +1203,19 @@ public static class Program [MethodImpl(MethodImplOptions.NoInlining)] public void Serialize() { - // Sender: multi-message wire framing — [201][UINT16][data]...[202]. The Flush() inside - // SerializeChunkedFramed writes the [202] CHUNK_END marker and flushes the kernel buffer. + // 2-task streaming pipeline: + // 1. Calling thread signals consumer task to begin Deserialize(input). Consumer immediately + // starts; first read blocks on input's MRES because no bytes flowed yet. + // 2. Calling thread starts SerializeChunkedFramed → chunks flow through PipeWriter → kernel pipe → + // drain task (BG) feeds input.Feed → MRES pulses → consumer's Deserialize consumes bytes + // chunk by chunk. Ser↔Des truly overlap here. + // 3. Calling thread waits for _consumeDone (signaling Deserialize returned). + _consumeDone.Reset(); + _consumeRequest.Set(); + AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); - // Receiver: synchronous Deserialize on the calling thread. Blocks (via TryAdvanceSegment's - // MRES.Wait) until the drain task has fed enough bytes for the structurally-complete graph. - // Returns when the graph is complete; finally block calls input.MessageDone() which arms - // _readPos = -1 sentinel for the next Append-cycle. Strictly sequential on the calling thread: - // the next Serialize() call's SerializeChunkedFramed only runs after this Deserialize returns. - _ = AcBinaryDeserializer.Deserialize(_input, _options); + _consumeDone.Wait(); } [MethodImpl(MethodImplOptions.NoInlining)] @@ -1180,10 +1226,19 @@ public static class Program public bool VerifyRoundTrip() { - // Round-trip one message synchronously on the calling thread. - AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); - var result = AcBinaryDeserializer.Deserialize(_input, _options); - return result != null && DeepEqualsViaJson(_order, result); + // Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality. + _captureResult = true; + try + { + Serialize(); + var result = _lastResult as TestOrder; + return result != null && DeepEqualsViaJson(_order, result); + } + finally + { + _captureResult = false; + _lastResult = null; + } } public void Dispose() @@ -1191,9 +1246,11 @@ public static class Program if (_disposed) return; _disposed = true; - // Cancel drain task → DrainFromAsync exits → input.Complete() in its finally. + // Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked. try { _cts.Cancel(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ } try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } // Complete writer + dispose pipe lifecycle. try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } @@ -1201,16 +1258,17 @@ public static class Program try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } try { _input.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ } try { _cts.Dispose(); } catch { /* swallow on teardown */ } } } /// /// Raw byte[] over a long-lived NamedPipe — NO chunk-framing, NO AsyncPipeReaderInput, - /// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background read-thread - /// drains the pipe sync; calling thread deserialises. The background-read mirrors - /// 's drain task — needed to avoid a kernel-buffer-full deadlock - /// when bytes.Length > inBufferSize (Large/Repeated/Deep payloads on a 16 KB pipe-buffer). + /// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background consumer task + /// reads and deserialises. Two-task pattern enables Ser↔Read overlap (kernel-pipe-pipelined) AND + /// avoids the kernel-buffer-full deadlock when bytes.Length > inBufferSize. /// /// Side-by-side with (chunked-framed AsyncPipe stack) this /// isolates two cost components on the SAME kernel-pipe transport with the SAME inBufferSize: @@ -1219,11 +1277,12 @@ public static class Program /// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff). /// This row vs (chunked-framed) — pure /// AsyncPipe-framework overhead (chunk header writes + sliding-window Feed + MRES wait inside - /// AsyncPipeReaderInput). Both rows have a long-lived read-thread; only the framing differs. + /// AsyncPipeReaderInput) AND the streaming-pipeline benefit of intra-message Ser↔Des overlap (which + /// raw lacks — raw can only Ser↔Read overlap, with Des sequential after Read completes). /// /// Per-iter byte[] allocation from AcBinarySerializer.Serialize is part of the cost (matches /// 's API contract); the receive-side scratch buffer is also allocated per-iter - /// on the read-thread (counted via GC.GetTotalAllocatedBytes in MeasureAllocationTotal). + /// on the consumer-task (counted via GC.GetTotalAllocatedBytes in MeasureAllocationTotal). /// private sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable { @@ -1235,16 +1294,18 @@ public static class Program private readonly NamedPipeServerStream _pipeServer; private readonly NamedPipeClientStream _pipeClient; - // Long-lived read-thread infrastructure — mirrors AcBinaryNamedPipeBenchmark's _drainTask. Needed - // to prevent kernel-buffer-full deadlock: when bytes.Length > inBufferSize, _pipeClient.Write - // blocks until the kernel buffer drains, but the drain only happens when SOMETHING reads from the - // server end. Single-thread Write→Read sequencing → deadlock. Background-read overlaps the read. + // Long-lived consumer-task infrastructure (Read + Deserialize on BG thread, signaled per iter). + // Mirrors AcBinaryNamedPipeBenchmark's drain+consumer pair, but raw byte[] doesn't have an + // intermediate sliding-window buffer, so Read+Des happen sequentially in one BG task: Read N bytes + // → Deserialize(bytes) → signal done. Calling thread's Ser↔Write overlaps with this BG Read+Des + // through kernel-pipe pipelining. private readonly CancellationTokenSource _cts; - private readonly Task _readTask; - private readonly ManualResetEventSlim _readRequest = new(false); - private readonly ManualResetEventSlim _readDone = new(false); + private readonly Task _consumerTask; + private readonly ManualResetEventSlim _consumeRequest = new(false); + private readonly ManualResetEventSlim _consumeDone = new(false); private int _pendingReadSize; - private byte[]? _receivedSlot; + private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters + private bool _captureResult; // toggle: when true, ConsumerLoop stores result; otherwise discards private bool _disposed; public string Engine => EngineAcBinary; @@ -1255,7 +1316,7 @@ public static class Program public long SetupSerializeAllocBytes { get; } public long SetupDeserializeAllocBytes { get; } public bool IsRoundTripOnly => true; - public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,bg-read)"); + public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,2-task)"); public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) { @@ -1288,53 +1349,59 @@ public static class Program SetupSerializeAllocBytes = afterSer - beforeSer; // === DESERIALIZE-side setup measurement === - // 1× background read-thread + 2× MRES (request / done) + cancellation source. Matches the - // chunked benchmark's deserialize-side setup cost shape (it has 1× drain Task + AsyncPipeReaderInput - // with 1× MRES + ArrayPool rent). + // 1× background consumer-task + 2× MRES (request / done) + cancellation source. Matches the + // chunked benchmark's deserialize-side setup cost shape. GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var beforeDes = GC.GetAllocatedBytesForCurrentThread(); _cts = new CancellationTokenSource(); - _readTask = Task.Run(ReadLoop); + _consumerTask = Task.Run(ConsumerLoop); var afterDes = GC.GetAllocatedBytesForCurrentThread(); SetupDeserializeAllocBytes = afterDes - beforeDes; } - // Long-lived read-loop on a background thread. Pattern: wait for request → drain N bytes → publish - // result via _receivedSlot → signal done. The calling thread provides the size via _pendingReadSize - // BEFORE setting _readRequest, so the read-thread always knows how much to read. - private void ReadLoop() + // BG consumer: parks on _consumeRequest, reads N bytes from pipe, runs Deserialize(bytes), signals + // _consumeDone. The Read overlaps with the calling thread's Write through the kernel-pipe; Des happens + // sequentially after Read completes (raw byte[] needs the full message to deserialize). + private void ConsumerLoop() { var ct = _cts.Token; try { - while (!ct.IsCancellationRequested) + while (true) { - _readRequest.Wait(ct); - if (ct.IsCancellationRequested) break; - _readRequest.Reset(); + _consumeRequest.Wait(ct); + if (ct.IsCancellationRequested) return; + _consumeRequest.Reset(); - var size = _pendingReadSize; - var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal - var totalRead = 0; - while (totalRead < size) + try { - var n = _pipeServer.Read(bytes, totalRead, size - totalRead); - if (n == 0) break; // pipe closed / EOF — partial read returned to caller - totalRead += n; + var size = _pendingReadSize; + var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal + var totalRead = 0; + while (totalRead < size) + { + var n = _pipeServer.Read(bytes, totalRead, size - totalRead); + if (n == 0) break; // pipe closed / EOF — partial read swallowed + totalRead += n; + } + var result = AcBinaryDeserializer.Deserialize(bytes, _options); + if (_captureResult) _lastResult = result; + } + catch + { + // Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip, + // or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose. + } + finally + { + _consumeDone.Set(); } - _receivedSlot = bytes; - _readDone.Set(); } } catch (OperationCanceledException) { // Cooperative cancel — Dispose path. Swallow. } - catch - { - // Any other error during teardown → swallow; the calling thread's _readDone.Wait() - // would then time out, surfaced by the dispose timeout below. - } } public void Warmup(int iterations) @@ -1348,27 +1415,26 @@ public static class Program [MethodImpl(MethodImplOptions.NoInlining)] public void Serialize() { - // Sender: serialize → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark API contract). + // 2-task streaming pipeline: + // 1. Calling thread serialises → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark contract). + // 2. Calling thread hands off expected size + signals consumer task. Consumer task starts Read loop + // on the pipe (BG thread). Calling thread proceeds to Write the bytes — Read and Write overlap + // through the kernel-pipe (kernel buffer fills, drains as consumer reads, sender resumes). + // 3. Calling thread waits for _consumeDone (consumer task finished Read+Des). + // + // Note: unlike chunked, raw byte[] cannot do Ser↔Des overlap (Des needs the full bytes before + // starting). Only Write↔Read overlaps here. The Des sequence on BG thread is: Read full bytes → + // Des the full graph → signal done. This is the architectural difference between raw and chunked. var bytes = AcBinarySerializer.Serialize(_order, _options); - // Hand off the expected size to the read-thread BEFORE signalling — read-thread reads - // _pendingReadSize after _readRequest.Wait returns, so write-then-set ordering is sufficient - // (MRES.Set has release semantics; MRES.Wait has acquire). _pendingReadSize = bytes.Length; - _readDone.Reset(); - _readRequest.Set(); + _consumeDone.Reset(); + _consumeRequest.Set(); - // Sync write on calling thread, OVERLAPPING with the read-thread's Read loop. The kernel - // buffer may fill (bytes.Length > inBufferSize) — Write blocks; the read-thread drains; - // Write resumes. Total wall time ≈ Write-bound or Read-bound, whichever is slower. _pipeClient.Write(bytes, 0, bytes.Length); _pipeClient.Flush(); - // Wait for the read-thread to finish accumulating the message. - _readDone.Wait(); - var receivedBytes = _receivedSlot!; - - _ = AcBinaryDeserializer.Deserialize(receivedBytes, _options); + _consumeDone.Wait(); } [MethodImpl(MethodImplOptions.NoInlining)] @@ -1379,18 +1445,19 @@ public static class Program public bool VerifyRoundTrip() { - // Inlined version of Serialize() that captures the deserialised graph (Serialize()'s - // discard-pattern is correct for the timed loop but useless for verification). - var bytes = AcBinarySerializer.Serialize(_order, _options); - _pendingReadSize = bytes.Length; - _readDone.Reset(); - _readRequest.Set(); - _pipeClient.Write(bytes, 0, bytes.Length); - _pipeClient.Flush(); - _readDone.Wait(); - var received = _receivedSlot!; - var result = AcBinaryDeserializer.Deserialize(received, _options); - return result != null && DeepEqualsViaJson(_order, result); + // Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality. + _captureResult = true; + try + { + Serialize(); + var result = _lastResult as TestOrder; + return result != null && DeepEqualsViaJson(_order, result); + } + finally + { + _captureResult = false; + _lastResult = null; + } } public void Dispose() @@ -1398,16 +1465,16 @@ public static class Program if (_disposed) return; _disposed = true; - // Cancel the read-loop → ReadLoop exits its Wait via OperationCanceledException. + // Cancel the consumer task → ConsumerLoop exits its Wait via OperationCanceledException. try { _cts.Cancel(); } catch { /* swallow on teardown */ } - try { _readRequest.Set(); } catch { /* nudge in case Wait is parked */ } - try { _readTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } + try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ } + try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } // Symmetric teardown — close client first (writer side), then server. try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } - try { _readRequest.Dispose(); } catch { /* swallow on teardown */ } - try { _readDone.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ } + try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ } try { _cts.Dispose(); } catch { /* swallow on teardown */ } } }