[LOADED_DOCS: 3 files, no new loads]

Refactor pipe benchmarks to 2-task streaming model

Refactored AcBinaryNamedPipeBenchmark and AcBinaryNamedPipeRawByteArrayBenchmark to use a two-task (producer/consumer) streaming pipeline for deserialization, enabling true Ser↔Des overlap. Reduced BufferWriterChunkSize from 16K to 4K. Updated synchronization, cleanup, and comments to reflect the new architecture and improve performance comparison between chunked and raw byte[] modes.
This commit is contained in:
Loretta 2026-05-02 11:55:46 +02:00
parent a537f18294
commit 05f90a5639
1 changed files with 162 additions and 95 deletions

View File

@ -449,7 +449,7 @@ public static class Program
// fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level // fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level
// wire chunk AND kernel transfer unit; change ONLY this line when tuning. // wire chunk AND kernel transfer unit; change ONLY this line when tuning.
var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode; var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode;
binaryFastModePipeChunkOnly.BufferWriterChunkSize = 16_384; //AsyncPipeWriterOutput.MaxChunkSize; binaryFastModePipeChunkOnly.BufferWriterChunkSize = 4096; //AsyncPipeWriterOutput.MaxChunkSize;
return new List<ISerializerBenchmark> return new List<ISerializerBenchmark>
{ {
@ -1081,7 +1081,12 @@ public static class Program
// Long-lived multi-message receive infrastructure (set up once in ctor). // Long-lived multi-message receive infrastructure (set up once in ctor).
private readonly AsyncPipeReaderInput _input; private readonly AsyncPipeReaderInput _input;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly Task _drainTask; private readonly Task _drainTask; // BG: PipeReader → input.Feed (continuous pump)
private readonly Task _consumerTask; // BG: per-iter Deserialize<T>(input) loop, signaled by calling thread
private readonly ManualResetEventSlim _consumeRequest = new(false);
private readonly ManualResetEventSlim _consumeDone = new(false);
private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters
private bool _captureResult; // toggle: when true, ConsumeLoop stores result; otherwise discards
private bool _disposed; private bool _disposed;
public string Engine => EngineAcBinary; public string Engine => EngineAcBinary;
@ -1092,7 +1097,7 @@ public static class Program
public long SetupSerializeAllocBytes { get; } public long SetupSerializeAllocBytes { get; }
public long SetupDeserializeAllocBytes { get; } public long SetupDeserializeAllocBytes { get; }
public bool IsRoundTripOnly => true; public bool IsRoundTripOnly => true;
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)"); public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage,2-task)");
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{ {
@ -1131,24 +1136,62 @@ public static class Program
// === DESERIALIZE-side setup measurement === // === DESERIALIZE-side setup measurement ===
// PipeReader wrapper + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain // PipeReader wrapper + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain
// task scaffolding. The long-lived deserialize-side allocation that per-iter measurements // task + consumer task scaffolding. Two long-lived BG tasks total: drain pumps bytes from the
// hide today, surfaced here for comparison-vs-FreshInstance fairness. // kernel pipe into input; consumer drives Deserialize<T>(input) per iter on signal.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeDes = GC.GetAllocatedBytesForCurrentThread(); var beforeDes = GC.GetAllocatedBytesForCurrentThread();
_pipeReader = PipeReader.Create(_pipeServer); _pipeReader = PipeReader.Create(_pipeServer);
// 1× multi-message receive infrastructure: long-lived input + 1 background drain task.
// Per-iter Serialize() does its own Deserialize<T>(input, opts) call on the calling thread —
// strictly sequential per the calling thread's loop, so the producer (drain) and consumer
// (deserialiser, on the calling thread) cannot race on the buffer.
_input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true); _input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true);
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
// Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for // Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for
// the full benchmark lifetime (NOT per iteration) — its overhead is amortised across all messages. // the full benchmark lifetime — its overhead is amortised across all messages.
_drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token)); _drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token));
// Consumer task: per-iter Deserialize<T>(input) loop. Started here once; signaled per-iter via
// _consumeRequest. Enables Ser↔Des streaming overlap — calling thread runs SerializeChunkedFramed
// while THIS task simultaneously runs Deserialize<T>, both consuming/producing through the
// sliding-window buffer pipelined by the drain task.
_consumerTask = Task.Run(ConsumeLoop);
var afterDes = GC.GetAllocatedBytesForCurrentThread(); var afterDes = GC.GetAllocatedBytesForCurrentThread();
SetupDeserializeAllocBytes = afterDes - beforeDes; SetupDeserializeAllocBytes = afterDes - beforeDes;
} }
// BG consumer: parks on _consumeRequest, runs Deserialize<T>(_input) when signaled, signals _consumeDone.
// The Deserialize call internally blocks on the input's MRES whenever the drain hasn't yet fed enough
// bytes for the next read — that's where the streaming-pipeline overlap with the calling thread (Ser)
// happens.
private void ConsumeLoop()
{
var ct = _cts.Token;
try
{
while (true)
{
_consumeRequest.Wait(ct);
if (ct.IsCancellationRequested) return;
_consumeRequest.Reset();
try
{
var result = AcBinaryDeserializer.Deserialize<TestOrder>(_input, _options);
if (_captureResult) _lastResult = result;
}
catch
{
// Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip,
// or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose.
}
finally
{
_consumeDone.Set();
}
}
}
catch (OperationCanceledException)
{
// Cooperative cancel — Dispose path. Swallow.
}
}
public void Warmup(int iterations) public void Warmup(int iterations)
{ {
for (var i = 0; i < iterations; i++) for (var i = 0; i < iterations; i++)
@ -1160,16 +1203,19 @@ public static class Program
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize() public void Serialize()
{ {
// Sender: multi-message wire framing — [201][UINT16][data]...[202]. The Flush() inside // 2-task streaming pipeline:
// SerializeChunkedFramed writes the [202] CHUNK_END marker and flushes the kernel buffer. // 1. Calling thread signals consumer task to begin Deserialize<T>(input). Consumer immediately
// starts; first read blocks on input's MRES because no bytes flowed yet.
// 2. Calling thread starts SerializeChunkedFramed → chunks flow through PipeWriter → kernel pipe →
// drain task (BG) feeds input.Feed → MRES pulses → consumer's Deserialize<T> consumes bytes
// chunk by chunk. Ser↔Des truly overlap here.
// 3. Calling thread waits for _consumeDone (signaling Deserialize<T> returned).
_consumeDone.Reset();
_consumeRequest.Set();
AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
// Receiver: synchronous Deserialize<T> on the calling thread. Blocks (via TryAdvanceSegment's _consumeDone.Wait();
// MRES.Wait) until the drain task has fed enough bytes for the structurally-complete graph.
// Returns when the graph is complete; finally block calls input.MessageDone() which arms
// _readPos = -1 sentinel for the next Append-cycle. Strictly sequential on the calling thread:
// the next Serialize() call's SerializeChunkedFramed only runs after this Deserialize<T> returns.
_ = AcBinaryDeserializer.Deserialize<TestOrder>(_input, _options);
} }
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
@ -1180,20 +1226,31 @@ public static class Program
public bool VerifyRoundTrip() public bool VerifyRoundTrip()
{ {
// Round-trip one message synchronously on the calling thread. // Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality.
AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); _captureResult = true;
var result = AcBinaryDeserializer.Deserialize<TestOrder>(_input, _options); try
{
Serialize();
var result = _lastResult as TestOrder;
return result != null && DeepEqualsViaJson(_order, result); return result != null && DeepEqualsViaJson(_order, result);
} }
finally
{
_captureResult = false;
_lastResult = null;
}
}
public void Dispose() public void Dispose()
{ {
if (_disposed) return; if (_disposed) return;
_disposed = true; _disposed = true;
// Cancel drain task → DrainFromAsync exits → input.Complete() in its finally. // Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked.
try { _cts.Cancel(); } catch { /* swallow on teardown */ } try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ }
try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Complete writer + dispose pipe lifecycle. // Complete writer + dispose pipe lifecycle.
try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
@ -1201,16 +1258,17 @@ public static class Program
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _input.Dispose(); } catch { /* swallow on teardown */ } try { _input.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ } try { _cts.Dispose(); } catch { /* swallow on teardown */ }
} }
} }
/// <summary> /// <summary>
/// Raw <c>byte[]</c> over a long-lived NamedPipe — NO chunk-framing, NO <c>AsyncPipeReaderInput</c>, /// Raw <c>byte[]</c> over a long-lived NamedPipe — NO chunk-framing, NO <c>AsyncPipeReaderInput</c>,
/// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background read-thread /// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background consumer task
/// drains the pipe sync; calling thread deserialises. The background-read mirrors /// reads and deserialises. Two-task pattern enables Ser↔Read overlap (kernel-pipe-pipelined) AND
/// <see cref="AcBinaryNamedPipeBenchmark"/>'s drain task — needed to avoid a kernel-buffer-full deadlock /// avoids the kernel-buffer-full deadlock when <c>bytes.Length &gt; inBufferSize</c>.
/// when <c>bytes.Length &gt; inBufferSize</c> (Large/Repeated/Deep payloads on a 16 KB pipe-buffer).
/// ///
/// Side-by-side with <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed AsyncPipe stack) this /// Side-by-side with <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed AsyncPipe stack) this
/// isolates two cost components on the SAME kernel-pipe transport with the SAME <c>inBufferSize</c>: /// isolates two cost components on the SAME kernel-pipe transport with the SAME <c>inBufferSize</c>:
@ -1219,11 +1277,12 @@ public static class Program
/// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff).</description></item> /// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff).</description></item>
/// <item><description><b>This row vs <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed)</b> — pure /// <item><description><b>This row vs <see cref="AcBinaryNamedPipeBenchmark"/> (chunked-framed)</b> — pure
/// AsyncPipe-framework overhead (chunk header writes + sliding-window <c>Feed</c> + MRES wait inside /// AsyncPipe-framework overhead (chunk header writes + sliding-window <c>Feed</c> + MRES wait inside
/// <c>AsyncPipeReaderInput</c>). Both rows have a long-lived read-thread; only the framing differs.</description></item> /// <c>AsyncPipeReaderInput</c>) AND the streaming-pipeline benefit of intra-message Ser↔Des overlap (which
/// raw lacks — raw can only Ser↔Read overlap, with Des sequential after Read completes).</description></item>
/// </list> /// </list>
/// Per-iter <c>byte[]</c> allocation from <c>AcBinarySerializer.Serialize</c> is part of the cost (matches /// Per-iter <c>byte[]</c> allocation from <c>AcBinarySerializer.Serialize</c> is part of the cost (matches
/// <see cref="AcBinaryBenchmark"/>'s API contract); the receive-side scratch buffer is also allocated per-iter /// <see cref="AcBinaryBenchmark"/>'s API contract); the receive-side scratch buffer is also allocated per-iter
/// on the read-thread (counted via <c>GC.GetTotalAllocatedBytes</c> in <c>MeasureAllocationTotal</c>). /// on the consumer-task (counted via <c>GC.GetTotalAllocatedBytes</c> in <c>MeasureAllocationTotal</c>).
/// </summary> /// </summary>
private sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable private sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable
{ {
@ -1235,16 +1294,18 @@ public static class Program
private readonly NamedPipeServerStream _pipeServer; private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient; private readonly NamedPipeClientStream _pipeClient;
// Long-lived read-thread infrastructure — mirrors AcBinaryNamedPipeBenchmark's _drainTask. Needed // Long-lived consumer-task infrastructure (Read + Deserialize on BG thread, signaled per iter).
// to prevent kernel-buffer-full deadlock: when bytes.Length > inBufferSize, _pipeClient.Write // Mirrors AcBinaryNamedPipeBenchmark's drain+consumer pair, but raw byte[] doesn't have an
// blocks until the kernel buffer drains, but the drain only happens when SOMETHING reads from the // intermediate sliding-window buffer, so Read+Des happen sequentially in one BG task: Read N bytes
// server end. Single-thread Write→Read sequencing → deadlock. Background-read overlaps the read. // → Deserialize<T>(bytes) → signal done. Calling thread's Ser↔Write overlaps with this BG Read+Des
// through kernel-pipe pipelining.
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly Task _readTask; private readonly Task _consumerTask;
private readonly ManualResetEventSlim _readRequest = new(false); private readonly ManualResetEventSlim _consumeRequest = new(false);
private readonly ManualResetEventSlim _readDone = new(false); private readonly ManualResetEventSlim _consumeDone = new(false);
private int _pendingReadSize; private int _pendingReadSize;
private byte[]? _receivedSlot; private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters
private bool _captureResult; // toggle: when true, ConsumerLoop stores result; otherwise discards
private bool _disposed; private bool _disposed;
public string Engine => EngineAcBinary; public string Engine => EngineAcBinary;
@ -1255,7 +1316,7 @@ public static class Program
public long SetupSerializeAllocBytes { get; } public long SetupSerializeAllocBytes { get; }
public long SetupDeserializeAllocBytes { get; } public long SetupDeserializeAllocBytes { get; }
public bool IsRoundTripOnly => true; public bool IsRoundTripOnly => true;
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,bg-read)"); public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,2-task)");
public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{ {
@ -1288,53 +1349,59 @@ public static class Program
SetupSerializeAllocBytes = afterSer - beforeSer; SetupSerializeAllocBytes = afterSer - beforeSer;
// === DESERIALIZE-side setup measurement === // === DESERIALIZE-side setup measurement ===
// 1× background read-thread + 2× MRES (request / done) + cancellation source. Matches the // 1× background consumer-task + 2× MRES (request / done) + cancellation source. Matches the
// chunked benchmark's deserialize-side setup cost shape (it has 1× drain Task + AsyncPipeReaderInput // chunked benchmark's deserialize-side setup cost shape.
// with 1× MRES + ArrayPool rent).
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeDes = GC.GetAllocatedBytesForCurrentThread(); var beforeDes = GC.GetAllocatedBytesForCurrentThread();
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_readTask = Task.Run(ReadLoop); _consumerTask = Task.Run(ConsumerLoop);
var afterDes = GC.GetAllocatedBytesForCurrentThread(); var afterDes = GC.GetAllocatedBytesForCurrentThread();
SetupDeserializeAllocBytes = afterDes - beforeDes; SetupDeserializeAllocBytes = afterDes - beforeDes;
} }
// Long-lived read-loop on a background thread. Pattern: wait for request → drain N bytes → publish // BG consumer: parks on _consumeRequest, reads N bytes from pipe, runs Deserialize<T>(bytes), signals
// result via _receivedSlot → signal done. The calling thread provides the size via _pendingReadSize // _consumeDone. The Read overlaps with the calling thread's Write through the kernel-pipe; Des happens
// BEFORE setting _readRequest, so the read-thread always knows how much to read. // sequentially after Read completes (raw byte[] needs the full message to deserialize).
private void ReadLoop() private void ConsumerLoop()
{ {
var ct = _cts.Token; var ct = _cts.Token;
try try
{ {
while (!ct.IsCancellationRequested) while (true)
{ {
_readRequest.Wait(ct); _consumeRequest.Wait(ct);
if (ct.IsCancellationRequested) break; if (ct.IsCancellationRequested) return;
_readRequest.Reset(); _consumeRequest.Reset();
try
{
var size = _pendingReadSize; var size = _pendingReadSize;
var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal var bytes = new byte[size]; // per-iter alloc — counted by MeasureAllocationTotal
var totalRead = 0; var totalRead = 0;
while (totalRead < size) while (totalRead < size)
{ {
var n = _pipeServer.Read(bytes, totalRead, size - totalRead); var n = _pipeServer.Read(bytes, totalRead, size - totalRead);
if (n == 0) break; // pipe closed / EOF — partial read returned to caller if (n == 0) break; // pipe closed / EOF — partial read swallowed
totalRead += n; totalRead += n;
} }
_receivedSlot = bytes; var result = AcBinaryDeserializer.Deserialize<TestOrder>(bytes, _options);
_readDone.Set(); if (_captureResult) _lastResult = result;
}
catch
{
// Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip,
// or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose.
}
finally
{
_consumeDone.Set();
}
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
// Cooperative cancel — Dispose path. Swallow. // Cooperative cancel — Dispose path. Swallow.
} }
catch
{
// Any other error during teardown → swallow; the calling thread's _readDone.Wait()
// would then time out, surfaced by the dispose timeout below.
}
} }
public void Warmup(int iterations) public void Warmup(int iterations)
@ -1348,27 +1415,26 @@ public static class Program
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize() public void Serialize()
{ {
// Sender: serialize → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark API contract). // 2-task streaming pipeline:
// 1. Calling thread serialises → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark contract).
// 2. Calling thread hands off expected size + signals consumer task. Consumer task starts Read loop
// on the pipe (BG thread). Calling thread proceeds to Write the bytes — Read and Write overlap
// through the kernel-pipe (kernel buffer fills, drains as consumer reads, sender resumes).
// 3. Calling thread waits for _consumeDone (consumer task finished Read+Des).
//
// Note: unlike chunked, raw byte[] cannot do Ser↔Des overlap (Des needs the full bytes before
// starting). Only Write↔Read overlaps here. The Des sequence on BG thread is: Read full bytes →
// Des the full graph → signal done. This is the architectural difference between raw and chunked.
var bytes = AcBinarySerializer.Serialize(_order, _options); var bytes = AcBinarySerializer.Serialize(_order, _options);
// Hand off the expected size to the read-thread BEFORE signalling — read-thread reads
// _pendingReadSize after _readRequest.Wait returns, so write-then-set ordering is sufficient
// (MRES.Set has release semantics; MRES.Wait has acquire).
_pendingReadSize = bytes.Length; _pendingReadSize = bytes.Length;
_readDone.Reset(); _consumeDone.Reset();
_readRequest.Set(); _consumeRequest.Set();
// Sync write on calling thread, OVERLAPPING with the read-thread's Read loop. The kernel
// buffer may fill (bytes.Length > inBufferSize) — Write blocks; the read-thread drains;
// Write resumes. Total wall time ≈ Write-bound or Read-bound, whichever is slower.
_pipeClient.Write(bytes, 0, bytes.Length); _pipeClient.Write(bytes, 0, bytes.Length);
_pipeClient.Flush(); _pipeClient.Flush();
// Wait for the read-thread to finish accumulating the message. _consumeDone.Wait();
_readDone.Wait();
var receivedBytes = _receivedSlot!;
_ = AcBinaryDeserializer.Deserialize<TestOrder>(receivedBytes, _options);
} }
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
@ -1379,35 +1445,36 @@ public static class Program
public bool VerifyRoundTrip() public bool VerifyRoundTrip()
{ {
// Inlined version of Serialize() that captures the deserialised graph (Serialize()'s // Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality.
// discard-pattern is correct for the timed loop but useless for verification). _captureResult = true;
var bytes = AcBinarySerializer.Serialize(_order, _options); try
_pendingReadSize = bytes.Length; {
_readDone.Reset(); Serialize();
_readRequest.Set(); var result = _lastResult as TestOrder;
_pipeClient.Write(bytes, 0, bytes.Length);
_pipeClient.Flush();
_readDone.Wait();
var received = _receivedSlot!;
var result = AcBinaryDeserializer.Deserialize<TestOrder>(received, _options);
return result != null && DeepEqualsViaJson(_order, result); return result != null && DeepEqualsViaJson(_order, result);
} }
finally
{
_captureResult = false;
_lastResult = null;
}
}
public void Dispose() public void Dispose()
{ {
if (_disposed) return; if (_disposed) return;
_disposed = true; _disposed = true;
// Cancel the read-loop → ReadLoop exits its Wait via OperationCanceledException. // Cancel the consumer task → ConsumerLoop exits its Wait via OperationCanceledException.
try { _cts.Cancel(); } catch { /* swallow on teardown */ } try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _readRequest.Set(); } catch { /* nudge in case Wait is parked */ } try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ }
try { _readTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Symmetric teardown — close client first (writer side), then server. // Symmetric teardown — close client first (writer side), then server.
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _readRequest.Dispose(); } catch { /* swallow on teardown */ } try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ }
try { _readDone.Dispose(); } catch { /* swallow on teardown */ } try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ } try { _cts.Dispose(); } catch { /* swallow on teardown */ }
} }
} }