using AyCode.Core.Serializers.Binaries; using AyCode.Core.Tests.Serialization; // DrainFromAsync extension (test-only, used by benchmark) using AyCode.Core.Tests.TestModels; using System.IO.Pipelines; using System.IO.Pipes; using System.Runtime.CompilerServices; namespace AyCode.Core.Benchmarks.Workloads.Scenarios; /// /// Benchmarks AcBinary over a long-lived NamedPipe IPC connection using the AcBinary native streaming API /// ( /// + + ). /// Mirrors what a real consumer (e.g. DeserializeFromPipeReaderAsync) does per message: /// long-lived with multi-message wire framing on top of a long-lived NamedPipe. /// /// Architecture: /// /// Constructor (NOT timed): sets up + , /// waits for connection, creates one long-lived / /// pair, ONE long-lived /// in multiMessage = true mode, ONE drain Task that pumps /// forever, and ONE deserialize Task that loops AcBinaryDeserializer.Deserialize<T>(input, opts) /// producing into a . /// Per-iteration (timed): sender writes via /// /// — multi-message wire ([201][UINT16][data]...[202]); the [202] end marker arms the input's /// _readPos = -1 sentinel, so the next message's first AppendToBuffer recycles the buffer to 0. /// Then receiver awaits the channel for the deserialized result. /// is a no-op (full round-trip captured in ); /// =true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip. /// /// /// Per-iter overhead: 0 new Task.Run, 0 new AsyncPipeReaderInput, 0 new CancellationTokenSource. /// Pure cost = SerializeChunkedFramed (CPU + chunk-onkénti flush) + kernel write/read syscalls + 1 sync barrier /// (channel) + deserialized graph alloc. The "multi-message reuse" pattern enabled by Q4T8 fix (R5K2 minimum: _readPos = -1 /// sentinel + AppendToBuffer sliding-window cycling). /// /// Approximation note: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR /// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound. /// public sealed class AcBinaryNamedPipeBenchmark : ISerializerBenchmark, IDisposable where T : class { private readonly T _order; private readonly AcBinarySerializerOptions _options; private readonly byte[] _serialized; // for SerializedSize reporting only // Long-lived pipe lifecycle (set up once in ctor — NOT timed). private readonly NamedPipeServerStream _pipeServer; private readonly NamedPipeClientStream _pipeClient; private readonly PipeWriter _pipeWriter; private readonly PipeReader _pipeReader; // Long-lived multi-message receive infrastructure (set up once in ctor). private readonly AsyncPipeReaderInput _input; private readonly CancellationTokenSource _cts; private readonly Task _drainTask; // BG: PipeReader → input.Feed (continuous pump) private readonly Task _consumerTask; // BG: per-iter Deserialize(input) loop, signaled by calling thread private readonly ManualResetEventSlim _consumeRequest = new(false); private readonly ManualResetEventSlim _consumeDone = new(false); private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters private bool _captureResult; // toggle: when true, ConsumeLoop stores result; otherwise discards private bool _disposed; public BenchmarkEngine Engine => BenchmarkEngine.AcBinary; public BenchmarkIoMode IoMode => BenchmarkIoMode.NamedPipe; public BenchmarkDispatchMode DispatchMode => _options.UseGeneratedCode ? BenchmarkDispatchMode.SGen : BenchmarkDispatchMode.Runtime; public Type OrderType => typeof(T); public string OptionsPreset { get; } public int SerializedSize => _serialized.Length; public long SetupSerializeAllocBytes { get; } public long SetupDeserializeAllocBytes { get; } public bool IsRoundTripOnly => true; public string OptionsDescription => BenchmarkOptions.BuildAcBinary(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage,2-task)"); public AcBinaryNamedPipeBenchmark(T order, AcBinarySerializerOptions options, string optionsPreset) { _order = order; // BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers // — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk // size in CreateSerializers only. _options = options; OptionsPreset = optionsPreset; _serialized = AcBinarySerializer.Serialize(order, _options); // 1× pipe setup. Kernel-side pipe buffer (inBufferSize / outBufferSize on the server ctor — the // client inherits the server-defined buffer size at connect time) matches BufferWriterChunkSize // exactly: AsyncPipeWriterOutput now treats chunkSize as the chunk-on-wire total size (header + // data), so one WriteFile(chunkSize) syscall lands in exactly one kernel-page slot — page-aligned, // no fragmentation, no IRP reordering. _options.BufferWriterChunkSize is the single tunable source. var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}"; // === SERIALIZE-side setup measurement === // pipe-pair (server + client) + connect handshake + writer-side PipeWriter wrapper. GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var beforeSer = GC.GetAllocatedBytesForCurrentThread(); _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous, inBufferSize: _options.BufferWriterChunkSize, outBufferSize: _options.BufferWriterChunkSize); _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); var serverWait = _pipeServer.WaitForConnectionAsync(); _pipeClient.Connect(); serverWait.GetAwaiter().GetResult(); _pipeWriter = PipeWriter.Create(_pipeClient); var afterSer = GC.GetAllocatedBytesForCurrentThread(); SetupSerializeAllocBytes = afterSer - beforeSer; // === DESERIALIZE-side setup measurement === // PipeReader wrapper + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain // task + consumer task scaffolding. Two long-lived BG tasks total: drain pumps bytes from the // kernel pipe into input; consumer drives Deserialize(input) per iter on signal. GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var beforeDes = GC.GetAllocatedBytesForCurrentThread(); _pipeReader = PipeReader.Create(_pipeServer); _input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true); _cts = new CancellationTokenSource(); // Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for // the full benchmark lifetime — its overhead is amortised across all messages. _drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token)); // Consumer task: per-iter Deserialize(input) loop. Started here once; signaled per-iter via // _consumeRequest. Enables Ser↔Des streaming overlap — calling thread runs SerializeChunkedFramed // while THIS task simultaneously runs Deserialize, both consuming/producing through the // sliding-window buffer pipelined by the drain task. _consumerTask = Task.Run(ConsumeLoop); var afterDes = GC.GetAllocatedBytesForCurrentThread(); SetupDeserializeAllocBytes = afterDes - beforeDes; } // BG consumer: parks on _consumeRequest, runs Deserialize(_input) when signaled, signals _consumeDone. // The Deserialize call internally blocks on the input's MRES whenever the drain hasn't yet fed enough // bytes for the next read — that's where the streaming-pipeline overlap with the calling thread (Ser) // happens. private void ConsumeLoop() { var ct = _cts.Token; try { while (true) { _consumeRequest.Wait(ct); if (ct.IsCancellationRequested) return; _consumeRequest.Reset(); try { var result = AcBinaryDeserializer.Deserialize(_input, _options); if (_captureResult) _lastResult = result; } catch { // Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip, // or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose. } finally { _consumeDone.Set(); } } } catch (OperationCanceledException) { // Cooperative cancel — Dispose path. Swallow. } } [MethodImpl(MethodImplOptions.NoInlining)] public void Serialize() { // 2-task streaming pipeline: // 1. Calling thread signals consumer task to begin Deserialize(input). Consumer immediately // starts; first read blocks on input's MRES because no bytes flowed yet. // 2. Calling thread starts SerializeChunkedFramed → chunks flow through PipeWriter → kernel pipe → // drain task (BG) feeds input.Feed → MRES pulses → consumer's Deserialize consumes bytes // chunk by chunk. Ser↔Des truly overlap here. // 3. Calling thread waits for _consumeDone (signaling Deserialize returned). _consumeDone.Reset(); _consumeRequest.Set(); AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options); _consumeDone.Wait(); } [MethodImpl(MethodImplOptions.NoInlining)] public void Deserialize() { // No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract. } public bool VerifyRoundTrip() { // Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality. _captureResult = true; try { Serialize(); var result = _lastResult as T; return result != null && RoundTripValidator.DeepEqualsViaJson(_order, result); } finally { _captureResult = false; _lastResult = null; } } public void Dispose() { if (_disposed) return; _disposed = true; // Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked. try { _cts.Cancel(); } catch { /* swallow on teardown */ } try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ } try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } // Complete writer + dispose pipe lifecycle. try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ } try { _pipeReader.Complete(); } catch { /* swallow on teardown */ } try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } try { _input.Dispose(); } catch { /* swallow on teardown */ } try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ } try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ } try { _cts.Dispose(); } catch { /* swallow on teardown */ } } }