using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels;
using System.IO.Pipes;
using System.Runtime.CompilerServices;
namespace AyCode.Core.Serializers.Console.Benchmarks;
///
/// Raw byte[] over a long-lived NamedPipe — NO chunk-framing, NO AsyncPipeReaderInput,
/// NO sliding-window buffer. Calling thread serialises + writes; a long-lived background consumer task
/// reads and deserialises. Two-task pattern enables Ser↔Read overlap (kernel-pipe-pipelined) AND
/// avoids the kernel-buffer-full deadlock when bytes.Length > inBufferSize.
///
/// Side-by-side with (chunked-framed AsyncPipe stack) this
/// isolates two cost components on the SAME kernel-pipe transport with the SAME inBufferSize:
///
/// - This row vs (Byte[]) — pure kernel-NamedPipe
/// overhead (WriteFile / ReadFile syscalls + IRP queueing + buffer-copy + thread-handoff).
/// - This row vs (chunked-framed) — pure
/// AsyncPipe-framework overhead (chunk header writes + sliding-window Feed + MRES wait inside
/// AsyncPipeReaderInput) AND the streaming-pipeline benefit of intra-message Ser↔Des overlap (which
/// raw lacks — raw can only Ser↔Read overlap, with Des sequential after Read completes).
///
/// Per-iter byte[] allocation from AcBinarySerializer.Serialize is part of the cost (matches
/// 's API contract); the receive-side scratch buffer is also allocated per-iter
/// on the consumer-task (counted via GC.GetTotalAllocatedBytes in BenchmarkLoop.MeasureAllocationTotal).
///
internal sealed class AcBinaryNamedPipeRawByteArrayBenchmark : ISerializerBenchmark, IDisposable
{
private readonly TestOrder _order;
private readonly AcBinarySerializerOptions _options;
private readonly byte[] _serialized; // for SerializedSize reporting + receive-side size known upfront
// Long-lived pipe lifecycle (set up once in ctor — NOT timed).
private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient;
// Long-lived consumer-task infrastructure (Read + Deserialize on BG thread, signaled per iter).
// Mirrors AcBinaryNamedPipeBenchmark's drain+consumer pair, but raw byte[] doesn't have an
// intermediate sliding-window buffer, so Read+Des happen sequentially in one BG task: Read N bytes
// → Deserialize(bytes) → signal done. Calling thread's Ser↔Write overlaps with this BG Read+Des
// through kernel-pipe pipelining.
private readonly CancellationTokenSource _cts;
private readonly Task _consumerTask;
private readonly ManualResetEventSlim _consumeRequest = new(false);
private readonly ManualResetEventSlim _consumeDone = new(false);
private int _pendingReadSize;
private object? _lastResult; // captured during VerifyRoundTrip; null in benchmark iters
private bool _captureResult; // toggle: when true, ConsumerLoop stores result; otherwise discards
private bool _disposed;
public BenchmarkEngine Engine => BenchmarkEngine.AcBinary;
public BenchmarkIoMode IoMode => BenchmarkIoMode.NamedPipeRaw;
public BenchmarkDispatchMode DispatchMode => _options.UseGeneratedCode ? BenchmarkDispatchMode.SGen : BenchmarkDispatchMode.Runtime;
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupSerializeAllocBytes { get; }
public long SetupDeserializeAllocBytes { get; }
public bool IsRoundTripOnly => true;
public string OptionsDescription => BenchmarkOptions.BuildAcBinary(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(raw,2-task)");
public AcBinaryNamedPipeRawByteArrayBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
// BufferWriterChunkSize comes from the caller — same source-of-truth contract as
// AcBinaryNamedPipeBenchmark. The kernel pipe-buffer (inBufferSize) is wired to it so the
// raw-vs-chunked comparison runs on identical transport conditions.
_options = options;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
var pipeName = $"AcBinaryBenchRaw-{Guid.NewGuid():N}";
// === SERIALIZE-side setup measurement ===
// pipe-pair (server + client) + connect handshake. NO PipeWriter wrapper — we use the raw
// Stream.Write API directly, matching the no-framing semantics of this benchmark.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeSer = GC.GetAllocatedBytesForCurrentThread();
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous,
inBufferSize: _options.BufferWriterChunkSize,
outBufferSize: _options.BufferWriterChunkSize);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync();
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
var afterSer = GC.GetAllocatedBytesForCurrentThread();
SetupSerializeAllocBytes = afterSer - beforeSer;
// === DESERIALIZE-side setup measurement ===
// 1× background consumer-task + 2× MRES (request / done) + cancellation source. Matches the
// chunked benchmark's deserialize-side setup cost shape.
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
var beforeDes = GC.GetAllocatedBytesForCurrentThread();
_cts = new CancellationTokenSource();
_consumerTask = Task.Run(ConsumerLoop);
var afterDes = GC.GetAllocatedBytesForCurrentThread();
SetupDeserializeAllocBytes = afterDes - beforeDes;
}
// BG consumer: parks on _consumeRequest, reads N bytes from pipe, runs Deserialize(bytes), signals
// _consumeDone. The Read overlaps with the calling thread's Write through the kernel-pipe; Des happens
// sequentially after Read completes (raw byte[] needs the full message to deserialize).
private void ConsumerLoop()
{
var ct = _cts.Token;
try
{
while (true)
{
_consumeRequest.Wait(ct);
if (ct.IsCancellationRequested) return;
_consumeRequest.Reset();
try
{
var size = _pendingReadSize;
var bytes = new byte[size]; // per-iter alloc — counted by BenchmarkLoop.MeasureAllocationTotal
var totalRead = 0;
while (totalRead < size)
{
var n = _pipeServer.Read(bytes, totalRead, size - totalRead);
if (n == 0) break; // pipe closed / EOF — partial read swallowed
totalRead += n;
}
var result = AcBinaryDeserializer.Deserialize(bytes, _options);
if (_captureResult) _lastResult = result;
}
catch
{
// Swallow — calling thread sees the failure via missing/incorrect _lastResult during VerifyRoundTrip,
// or the benchmark loop just continues (timing impacted). Production teardown handled in Dispose.
}
finally
{
_consumeDone.Set();
}
}
}
catch (OperationCanceledException)
{
// Cooperative cancel — Dispose path. Swallow.
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
// 2-task streaming pipeline:
// 1. Calling thread serialises → fresh byte[] (per-iter alloc, matches AcBinaryBenchmark contract).
// 2. Calling thread hands off expected size + signals consumer task. Consumer task starts Read loop
// on the pipe (BG thread). Calling thread proceeds to Write the bytes — Read and Write overlap
// through the kernel-pipe (kernel buffer fills, drains as consumer reads, sender resumes).
// 3. Calling thread waits for _consumeDone (consumer task finished Read+Des).
//
// Note: unlike chunked, raw byte[] cannot do Ser↔Des overlap (Des needs the full bytes before
// starting). Only Write↔Read overlaps here. The Des sequence on BG thread is: Read full bytes →
// Des the full graph → signal done. This is the architectural difference between raw and chunked.
var bytes = AcBinarySerializer.Serialize(_order, _options);
_pendingReadSize = bytes.Length;
_consumeDone.Reset();
_consumeRequest.Set();
_pipeClient.Write(bytes, 0, bytes.Length);
_pipeClient.Flush();
_consumeDone.Wait();
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Deserialize()
{
// No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract.
}
public bool VerifyRoundTrip()
{
// Use the same 2-task streaming path as the benchmark, but capture the result for graph-equality.
_captureResult = true;
try
{
Serialize();
var result = _lastResult as TestOrder;
return result != null && BenchmarkLoop.DeepEqualsViaJson(_order, result);
}
finally
{
_captureResult = false;
_lastResult = null;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Cancel the consumer task → ConsumerLoop exits its Wait via OperationCanceledException.
try { _cts.Cancel(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ }
try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
// Symmetric teardown — close client first (writer side), then server.
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ }
try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ }
try { _cts.Dispose(); } catch { /* swallow on teardown */ }
}
}