AyCode.Core/AyCode.Core.Serializers.Con.../Benchmarks/AcBinaryNamedPipeBenchmark.cs

239 lines
13 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.Serialization; // DrainFromAsync extension (test-only, used by benchmark)
using AyCode.Core.Tests.TestModels;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Runtime.CompilerServices;
namespace AyCode.Core.Serializers.Console.Benchmarks;
/// <summary>
/// Benchmarks AcBinary over a long-lived NamedPipe IPC connection using the AcBinary native streaming API
/// (<see cref="AcBinarySerializer.SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// + <see cref="AsyncPipeReaderInput"/> + <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/>).
/// Mirrors what a real consumer (e.g. <c>DeserializeFromPipeReaderAsync</c>) does per message:
/// long-lived <see cref="AsyncPipeReaderInput"/> with multi-message wire framing on top of a long-lived NamedPipe.
///
/// <para><b>Architecture</b>:</para>
/// <list type="bullet">
/// <item>Constructor (NOT timed): sets up <see cref="NamedPipeServerStream"/> + <see cref="NamedPipeClientStream"/>,
/// waits for connection, creates one long-lived <see cref="System.IO.Pipelines.PipeWriter"/> /
/// <see cref="System.IO.Pipelines.PipeReader"/> pair, ONE long-lived <see cref="AsyncPipeReaderInput"/>
/// in <c>multiMessage = true</c> mode, ONE drain Task that pumps <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/>
/// forever, and ONE deserialize Task that loops <c>AcBinaryDeserializer.Deserialize&lt;T&gt;(input, opts)</c>
/// producing into a <see cref="System.Threading.Channels.Channel{T}"/>.</item>
/// <item>Per-iteration <see cref="Serialize"/> (timed): sender writes via
/// <see cref="AcBinarySerializer.SerializeChunkedFramed{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// — multi-message wire (<c>[201][UINT16][data]...[202]</c>); the <c>[202]</c> end marker arms the input's
/// <c>_readPos = -1</c> sentinel, so the next message's first <c>AppendToBuffer</c> recycles the buffer to 0.
/// Then receiver awaits the channel for the deserialized result.</item>
/// <item><see cref="Deserialize"/> is a no-op (full round-trip captured in <see cref="Serialize"/>);
/// <see cref="IsRoundTripOnly"/>=true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip.</item>
/// </list>
///
/// <para><b>Per-iter overhead</b>: 0 new <c>Task.Run</c>, 0 new <c>AsyncPipeReaderInput</c>, 0 new <c>CancellationTokenSource</c>.
/// Pure cost = <c>SerializeChunkedFramed</c> (CPU + chunk-onkénti flush) + kernel write/read syscalls + 1 sync barrier
/// (channel) + deserialized graph alloc. The "multi-message reuse" pattern enabled by Q4T8 fix (R5K2 minimum: <c>_readPos = -1</c>
/// sentinel + <c>AppendToBuffer</c> sliding-window cycling).</para>
///
/// <para><b>Approximation note</b>: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR
/// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound.</para>
/// </summary>
internal sealed class AcBinaryNamedPipeBenchmark<T> : ISerializerBenchmark, IDisposable where T : class
{
private readonly T _order;
private readonly AcBinarySerializerOptions _options;
private readonly byte[] _serialized; // for SerializedSize reporting only
// Long-lived pipe lifecycle (set up once in ctor — NOT timed).
private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient;
private readonly PipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
// Long-lived multi-message receive infrastructure (set up once in ctor).
private readonly AsyncPipeReaderInput _input;
private readonly CancellationTokenSource _cts;
private readonly Task _drainTask; // BG: PipeReader → input.Feed (continuous pump)
private readonly Task _consumerTask; // BG: per-iter Deserialize<T>(input) loop, signaled by calling thread
private readonly ManualResetEventSlim _consumeRequest = new(false);
private readonly ManualResetEventSlim _consumeDone = new(false);
private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters
private bool _captureResult; // toggle: when true, ConsumeLoop stores result; otherwise discards
private bool _disposed;
public BenchmarkEngine Engine => BenchmarkEngine.AcBinary;
public BenchmarkIoMode IoMode => BenchmarkIoMode.NamedPipe;
public BenchmarkDispatchMode DispatchMode => _options.UseGeneratedCode ? BenchmarkDispatchMode.SGen : BenchmarkDispatchMode.Runtime;
public Type OrderType => typeof(T);
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupSerializeAllocBytes { get; }
public long SetupDeserializeAllocBytes { get; }
public bool IsRoundTripOnly => true;
public string OptionsDescription => BenchmarkOptions.BuildAcBinary(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage,2-task)");
public AcBinaryNamedPipeBenchmark(T order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
// BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers
// — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk
// size in CreateSerializers only.
_options = options;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
// 1× pipe setup. Kernel-side pipe buffer (inBufferSize / outBufferSize on the server ctor — the
// client inherits the server-defined buffer size at connect time) matches BufferWriterChunkSize
// exactly: AsyncPipeWriterOutput now treats chunkSize as the chunk-on-wire total size (header +
// data), so one WriteFile(chunkSize) syscall lands in exactly one kernel-page slot — page-aligned,
// no fragmentation, no IRP reordering. _options.BufferWriterChunkSize is the single tunable source.
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
// === SERIALIZE-side setup measurement ===
// pipe-pair (server + client) + connect handshake + writer-side PipeWriter wrapper.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeSer = GC.GetAllocatedBytesForCurrentThread();
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous,
inBufferSize: _options.BufferWriterChunkSize,
outBufferSize: _options.BufferWriterChunkSize);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync();
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
_pipeWriter = PipeWriter.Create(_pipeClient);
var afterSer = GC.GetAllocatedBytesForCurrentThread();
SetupSerializeAllocBytes = afterSer - beforeSer;
// === DESERIALIZE-side setup measurement ===
// PipeReader wrapper + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain
// task + consumer task scaffolding. Two long-lived BG tasks total: drain pumps bytes from the
// kernel pipe into input; consumer drives Deserialize<T>(input) per iter on signal.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeDes = GC.GetAllocatedBytesForCurrentThread();
_pipeReader = PipeReader.Create(_pipeServer);
_input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true);
_cts = new CancellationTokenSource();
// Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for
// the full benchmark lifetime — its overhead is amortised across all messages.
_drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token));
// Consumer task: per-iter Deserialize<T>(input) loop. Started here once; signaled per-iter via
// _consumeRequest. Enables Ser↔Des streaming overlap — calling thread runs SerializeChunkedFramed
// while THIS task simultaneously runs Deserialize<T>, both consuming/producing through the
// sliding-window buffer pipelined by the drain task.
_consumerTask = Task.Run(ConsumeLoop);
var afterDes = GC.GetAllocatedBytesForCurrentThread();
SetupDeserializeAllocBytes = afterDes - beforeDes;
}
// BG consumer: parks on _consumeRequest, runs Deserialize<T>(_input) when signaled, signals _consumeDone.
// The Deserialize call internally blocks on the input's MRES whenever the drain hasn't yet fed enough
// bytes for the next read — that's where the streaming-pipeline overlap with the calling thread (Ser)
// happens.
private void ConsumeLoop()
{
var ct = _cts.Token;
try
{
while (true)
{
_consumeRequest.Wait(ct);
if (ct.IsCancellationRequested) return;
_consumeRequest.Reset();
try
{
var result = AcBinaryDeserializer.Deserialize<T>(_input, _options);
if (_captureResult) _lastResult = result;
}
catch
{
// Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip,
// or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose.
}
finally
{
_consumeDone.Set();
}
}
}
catch (OperationCanceledException)
{
// Cooperative cancel — Dispose path. Swallow.
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
// 2-task streaming pipeline:
// 1. Calling thread signals consumer task to begin Deserialize<T>(input). Consumer immediately
// starts; first read blocks on input's MRES because no bytes flowed yet.
// 2. Calling thread starts SerializeChunkedFramed → chunks flow through PipeWriter → kernel pipe →
// drain task (BG) feeds input.Feed → MRES pulses → consumer's Deserialize<T> consumes bytes
// chunk by chunk. Ser↔Des truly overlap here.
// 3. Calling thread waits for _consumeDone (signaling Deserialize<T> returned).
_consumeDone.Reset();
_consumeRequest.Set();
AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
_consumeDone.Wait();
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Deserialize()
{
// No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract.
}
public bool VerifyRoundTrip()
{
// Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality.
_captureResult = true;
try
{
Serialize();
var result = _lastResult as T;
return result != null && BenchmarkLoop.DeepEqualsViaJson(_order, result);
}
finally
{
_captureResult = false;
_lastResult = null;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked.
try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ }
try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Complete writer + dispose pipe lifecycle.
try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
try { _pipeReader.Complete(); } catch { /* swallow on teardown */ }
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _input.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ }
}
}