[LOADED_DOCS: 3 files, no new loads]

Refactor AcBinary NamedPipe to long-lived multi-message mode

Refactored the AcBinary NamedPipe benchmark to use a single long-lived AsyncPipeReaderInput in multi-message mode, with one background drain task and synchronous deserialization per message. Buffer recycling is now signaled by the consumer via a new MessageDone() method, called in the deserializer's finally block, preventing producer-consumer races. Added IsCompleted property to AsyncPipeReaderInput. Increased release-mode benchmark iteration counts. Updated documentation and comments to reflect the new architecture and rationale.
This commit is contained in:
Loretta 2026-04-30 21:14:38 +02:00
parent 204b361748
commit 5561246e8c
3 changed files with 129 additions and 72 deletions

View File

@ -69,9 +69,9 @@ public static class Program
private const int TestIterations = 1; private const int TestIterations = 1;
private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else #else
private static int WarmupIterations = 5; private static int WarmupIterations = 1000; //5000
private static int TestIterations = 1; private static int TestIterations = 1000; //1000
private static int BenchmarkSamples = 1; private static int BenchmarkSamples = 3;
#endif #endif
public static void Main(string[] args) public static void Main(string[] args)
@ -817,27 +817,29 @@ public static class Program
/// (<see cref="AcBinarySerializer.SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/> /// (<see cref="AcBinarySerializer.SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// + <see cref="AsyncPipeReaderInput"/> + <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/>). /// + <see cref="AsyncPipeReaderInput"/> + <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/>).
/// Mirrors what a real consumer (e.g. <c>DeserializeFromPipeReaderAsync</c>) does per message: /// Mirrors what a real consumer (e.g. <c>DeserializeFromPipeReaderAsync</c>) does per message:
/// fresh input + 2 background tasks (drain + deserialize) per iteration, on top of a long-lived NamedPipe. /// long-lived <see cref="AsyncPipeReaderInput"/> with multi-message wire framing on top of a long-lived NamedPipe.
/// ///
/// <para><b>Architecture</b>:</para> /// <para><b>Architecture</b>:</para>
/// <list type="bullet"> /// <list type="bullet">
/// <item>Constructor (NOT timed): sets up <see cref="NamedPipeServerStream"/> + <see cref="NamedPipeClientStream"/>, /// <item>Constructor (NOT timed): sets up <see cref="NamedPipeServerStream"/> + <see cref="NamedPipeClientStream"/>,
/// waits for connection, creates one long-lived <see cref="System.IO.Pipelines.PipeWriter"/> / /// waits for connection, creates one long-lived <see cref="System.IO.Pipelines.PipeWriter"/> /
/// <see cref="System.IO.Pipelines.PipeReader"/> pair on top of the streams.</item> /// <see cref="System.IO.Pipelines.PipeReader"/> pair, ONE long-lived <see cref="AsyncPipeReaderInput"/>
/// <item>Per-iteration <see cref="Serialize"/> (timed): sender writes one message via /// in <c>multiMessage = true</c> mode, ONE drain Task that pumps <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/>
/// <see cref="AcBinarySerializer.SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/> /// forever, and ONE deserialize Task that loops <c>AcBinaryDeserializer.Deserialize&lt;T&gt;(input, opts)</c>
/// + <c>FlushAsync</c> on the long-lived pipeWriter; receiver creates a per-message /// producing into a <see cref="System.Threading.Channels.Channel{T}"/>.</item>
/// <see cref="AsyncPipeReaderInput"/>, spawns a drain Task and a deserialize Task, awaits the deserialize result, /// <item>Per-iteration <see cref="Serialize"/> (timed): sender writes via
/// then cancels the drain (which calls <c>input.Complete()</c> in its <c>finally</c>) and disposes the input.</item> /// <see cref="AcBinarySerializer.SerializeChunkedFramed{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// — multi-message wire (<c>[201][UINT16][data]...[202]</c>); the <c>[202]</c> end marker arms the input's
/// <c>_readPos = -1</c> sentinel, so the next message's first <c>AppendToBuffer</c> recycles the buffer to 0.
/// Then receiver awaits the channel for the deserialized result.</item>
/// <item><see cref="Deserialize"/> is a no-op (full round-trip captured in <see cref="Serialize"/>); /// <item><see cref="Deserialize"/> is a no-op (full round-trip captured in <see cref="Serialize"/>);
/// <see cref="IsRoundTripOnly"/>=true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip.</item> /// <see cref="IsRoundTripOnly"/>=true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip.</item>
/// </list> /// </list>
/// ///
/// <para><b>Why per-message tasks</b>: the current AcBinary streaming API does not allow long-lived /// <para><b>Per-iter overhead</b>: 0 new <c>Task.Run</c>, 0 new <c>AsyncPipeReaderInput</c>, 0 new <c>CancellationTokenSource</c>.
/// <see cref="AsyncPipeReaderInput"/> reuse across multiple messages on a raw transport — see /// Pure cost = <c>SerializeChunkedFramed</c> (CPU + chunk-onkénti flush) + kernel write/read syscalls + 1 sync barrier
/// <c>BINARY_ISSUES.md#accore-bin-i-q4t8</c>. This is therefore the canonical pattern, mirrored after /// (channel) + deserialized graph alloc. The "multi-message reuse" pattern enabled by Q4T8 fix (R5K2 minimum: <c>_readPos = -1</c>
/// <c>AcBinaryDeserializer.DeserializeFromPipeReaderAsync</c>'s internals. The Task.Run pair + per-iter /// sentinel + <c>AppendToBuffer</c> sliding-window cycling).</para>
/// <c>AsyncPipeReaderInput</c> allocation are an intrinsic cost of the API today, NOT a benchmark artifact.</para>
/// ///
/// <para><b>Approximation note</b>: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR /// <para><b>Approximation note</b>: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR
/// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound.</para> /// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound.</para>
@ -853,6 +855,11 @@ public static class Program
private readonly NamedPipeClientStream _pipeClient; private readonly NamedPipeClientStream _pipeClient;
private readonly PipeWriter _pipeWriter; private readonly PipeWriter _pipeWriter;
private readonly PipeReader _pipeReader; private readonly PipeReader _pipeReader;
// Long-lived multi-message receive infrastructure (set up once in ctor).
private readonly AsyncPipeReaderInput _input;
private readonly CancellationTokenSource _cts;
private readonly Task _drainTask;
private bool _disposed; private bool _disposed;
public string Engine => EngineAcBinary; public string Engine => EngineAcBinary;
@ -861,35 +868,43 @@ public static class Program
public string OptionsPreset { get; } public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length; public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0; public long SetupAllocBytes => 0;
public bool IsRoundTripOnly => true; // Serialize() does the full per-message round-trip; Deserialize() is a no-op public bool IsRoundTripOnly => true;
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,SerializeChunked+AsyncPipeReaderInput)"; public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)";
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{ {
_order = order; _order = order;
_options = options; _options = options;
// 4 KB chunk size for the AsyncPipeWriterOutput (raw mode — no [201][UINT16][data] framing). // EXPERIMENTAL: 64 KB chunk size (UINT16 max) — minimises per-chunk FlushAsync syscalls on NamedPipe.
// Aligns with Kestrel slab + TCP MTU, the realistic SignalR-style profile. // Diagnostic comparison vs the 4 KB SignalR-realistic profile to see how much the chunk-flush count
_options.BufferWriterChunkSize = 4096; // dominates the NamedPipe overhead. Restore to 4096 (Kestrel/TCP-MTU aligned) once we have the data.
_options.BufferWriterChunkSize = AsyncPipeWriterOutput.MaxChunkSize;
OptionsPreset = optionsPreset; OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options); _serialized = AcBinarySerializer.Serialize(order, _options);
// 1× setup — long-lived NamedPipe + PipeWriter/PipeReader on top of the streams. // 1× pipe setup
// Byte mode (not Message mode) — AcBinary's chunked-stream wire format manages its own boundaries
// via the deserializer's structural knowledge of when an object graph ends.
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}"; var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync(); var serverWait = _pipeServer.WaitForConnectionAsync();
_pipeClient.Connect(); _pipeClient.Connect();
serverWait.GetAwaiter().GetResult(); serverWait.GetAwaiter().GetResult();
_pipeWriter = PipeWriter.Create(_pipeClient); _pipeWriter = PipeWriter.Create(_pipeClient);
_pipeReader = PipeReader.Create(_pipeServer); _pipeReader = PipeReader.Create(_pipeServer);
// 1× multi-message receive infrastructure: long-lived input + 1 background drain task.
// Per-iter Serialize() does its own Deserialize<T>(input, opts) call on the calling thread —
// strictly sequential per the calling thread's loop, so the producer (drain) and consumer
// (deserialiser, on the calling thread) cannot race on the buffer.
_input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true);
_cts = new CancellationTokenSource();
// Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for
// the full benchmark lifetime (NOT per iteration) — its overhead is amortised across all messages.
_drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token));
} }
public void Warmup(int iterations) public void Warmup(int iterations)
@ -903,23 +918,16 @@ public static class Program
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize() public void Serialize()
{ {
using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false); // Sender: multi-message wire framing — [201][UINT16][data]...[202]. The Flush() inside
using var cts = new CancellationTokenSource(); // SerializeChunkedFramed writes the [202] CHUNK_END marker and flushes the kernel buffer.
AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
// Receiver tasks must be ready BEFORE the sender flushes — otherwise the FlushAsync deadlocks // Receiver: synchronous Deserialize<T> on the calling thread. Blocks (via TryAdvanceSegment's
// waiting for someone to drain the kernel pipe buffer (NamedPipe loopback flow control). // MRES.Wait) until the drain task has fed enough bytes for the structurally-complete graph.
var drainTask = input.DrainFromAsync(_pipeReader, cts.Token); // Returns when the graph is complete; finally block calls input.MessageDone() which arms
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestOrder>(input, _options), cts.Token); // _readPos = -1 sentinel for the next Append-cycle. Strictly sequential on the calling thread:
// the next Serialize() call's SerializeChunkedFramed only runs after this Deserialize<T> returns.
AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options); _ = AcBinaryDeserializer.Deserialize<TestOrder>(_input, _options);
_pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
_ = deserTask.GetAwaiter().GetResult();
cts.Cancel();
try { drainTask.GetAwaiter().GetResult(); }
catch (OperationCanceledException) { }
catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
} }
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
@ -930,23 +938,9 @@ public static class Program
public bool VerifyRoundTrip() public bool VerifyRoundTrip()
{ {
// Single round-trip via the same path Serialize() uses, with the deserialized graph compared. // Round-trip one message synchronously on the calling thread.
using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false); AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
using var cts = new CancellationTokenSource(); var result = AcBinaryDeserializer.Deserialize<TestOrder>(_input, _options);
var drainTask = input.DrainFromAsync(_pipeReader, cts.Token);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestOrder>(input, _options), cts.Token);
AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options);
_pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
var result = deserTask.GetAwaiter().GetResult();
cts.Cancel();
try { drainTask.GetAwaiter().GetResult(); }
catch (OperationCanceledException) { }
catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
return result != null && DeepEqualsViaJson(_order, result); return result != null && DeepEqualsViaJson(_order, result);
} }
@ -954,12 +948,18 @@ public static class Program
{ {
if (_disposed) return; if (_disposed) return;
_disposed = true; _disposed = true;
// Complete the writer side first → the underlying pipe stream signals EOF, the reader sees it,
// any in-flight DrainFromAsync exits cleanly. Then dispose the streams. // Cancel drain task → DrainFromAsync exits → input.Complete() in its finally.
try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Complete writer + dispose pipe lifecycle.
try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
try { _pipeReader.Complete(); } catch { /* swallow on teardown */ } try { _pipeReader.Complete(); } catch { /* swallow on teardown */ }
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _input.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ }
} }
} }

View File

@ -309,11 +309,33 @@ public static partial class AcBinaryDeserializer
/// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/> /// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/>
public static T? Deserialize<T>(AsyncPipeReaderInput input, AcBinarySerializerOptions options) public static T? Deserialize<T>(AsyncPipeReaderInput input, AcBinarySerializerOptions options)
=> DeserializeSequence<T, AsyncPipeReaderInputAdapter>(new AsyncPipeReaderInputAdapter(input), typeof(T), options); {
try
{
return DeserializeSequence<T, AsyncPipeReaderInputAdapter>(new AsyncPipeReaderInputAdapter(input), typeof(T), options);
}
finally
{
// Consumer-side "I'm done with this message" signal — arms _readPos = -1 sentinel so the
// next AppendToBuffer recycles the buffer to 0 for the next message on the long-lived input.
// Race-free: the deserializer has finished reading by this point (try block returned),
// so the producer is free to mutate _writePos / _readPos.
input.MessageDone();
}
}
/// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/> /// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/>
public static object? Deserialize(AsyncPipeReaderInput input, Type targetType, AcBinarySerializerOptions options) public static object? Deserialize(AsyncPipeReaderInput input, Type targetType, AcBinarySerializerOptions options)
=> DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options); {
try
{
return DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options);
}
finally
{
input.MessageDone();
}
}
/// <summary> /// <summary>
/// Deserialize from a <see cref="System.IO.Pipelines.PipeReader"/> with full streaming pipeline /// Deserialize from a <see cref="System.IO.Pipelines.PipeReader"/> with full streaming pipeline

View File

@ -201,18 +201,18 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
} }
else if (marker == ChunkEnd) else if (marker == ChunkEnd)
{ {
// [202] = end of CURRENT message (NOT end of session). Two-step signal: // [202] = end of CURRENT message on the WIRE (NOT end of session). Reset only the
// (a) reset framing state machine to AwaitingHeader for the next [201] header, // framing state machine to AwaitingHeader for the next [201] header.
// (b) write _readPos = -1 sentinel — picked up by the next AppendToBuffer's // Buffer-cursor recycling is NOT triggered here — the producer-thread cannot safely
// sliding-window cycling, which resets the buffer to 0 for the new message. // reset _writePos = 0 while the consumer-thread may still be reading earlier bytes
// of the just-finished message (multi-chunk messages: by the time the producer
// parses [202], the consumer may have only consumed part of the buffer).
// The buffer-cycle signal is emitted by the consumer via MessageDone() — typically
// from the AcBinaryDeserializer.Deserialize<T>(AsyncPipeReaderInput, opts) finally
// block, AFTER the deserialiser has finished reading and returned the graph.
// _completed stays false — only external Complete() / stream-EOF marks session end. // _completed stays false — only external Complete() / stream-EOF marks session end.
// The sentinel is wire-format intrinsic: TryAdvanceSegment / Initialize handle EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset; awaiting MessageDone() from consumer for buffer recycle");
// _readPos < 0 defensively (treat as "fully consumed"), so the consumer never
// observes the sentinel directly — by the time the consumer reaches the next
// Initialize call, AppendToBuffer has already cycled _readPos back to 0.
EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset, _readPos sentinel armed");
_framingState = FramingState.AwaitingHeader; _framingState = FramingState.AwaitingHeader;
Volatile.Write(ref _readPos, -1);
} }
else else
{ {
@ -309,6 +309,41 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
EmitDiagnostic($"Complete writePos={Volatile.Read(ref _writePos)} readPos={Volatile.Read(ref _readPos)}"); EmitDiagnostic($"Complete writePos={Volatile.Read(ref _writePos)} readPos={Volatile.Read(ref _readPos)}");
} }
/// <summary>
/// Whether <see cref="Complete"/> has been called (or <see cref="DrainFromAsync"/>'s underlying
/// stream signalled EOF and the finally block closed the input). Once <c>true</c>, the session
/// has ended — any pending <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// call returns whatever partial buffer is left, and subsequent calls return immediately.
/// </summary>
public bool IsCompleted => Volatile.Read(ref _completed);
/// <summary>
/// Called by the consumer to signal "I have finished reading the current message" — typically
/// from the <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// finally block, AFTER the deserialiser has finished reading and the structurally-complete graph
/// has been returned. Arms a <c>_readPos = -1</c> sentinel that the next
/// <see cref="AppendToBuffer"/> picks up for sliding-window cycling — recycles the buffer to 0
/// for the next message on the long-lived input.
///
/// <para><b>Why the consumer signals (not the producer)</b>: the producer parses <c>[202]</c>
/// strictly on the wire — at the moment <c>[202]</c> arrives, the consumer-thread may still be
/// mid-graph reading earlier bytes of the just-finished message (especially for multi-chunk
/// messages, where the producer outpaces the consumer). Recycling the buffer based on
/// producer-side <c>[202]</c> alone races with the in-flight consumer read. By emitting the
/// signal from the consumer's "I'm done"-moment instead, both orderings are guaranteed: the
/// consumer has finished reading, AND the wire-side <c>[202]</c> has long since been parsed
/// (since the consumer reads only what the producer wrote).</para>
///
/// <para><b>Idempotent</b>: safe to call multiple times. No-op if the session has already
/// completed (<see cref="IsCompleted"/> is <c>true</c>) — there are no further messages.</para>
/// </summary>
public void MessageDone()
{
if (Volatile.Read(ref _completed)) return; // session already over
Volatile.Write(ref _readPos, -1);
EmitDiagnostic("MessageDone: _readPos sentinel armed for next AppendToBuffer");
}
// --- IBinaryInputBase (consumer thread) --- // --- IBinaryInputBase (consumer thread) ---
/// <summary> /// <summary>