192 lines
9.0 KiB
C#
192 lines
9.0 KiB
C#
using AyCode.Core.Serializers.Binaries;
|
|
using AyCode.Core.Tests.Serialization; // DrainFromAsync extension (test-only, used by benchmark)
|
|
using AyCode.Core.Tests.TestModels;
|
|
using System.IO.Pipelines;
|
|
using System.Runtime.CompilerServices;
|
|
|
|
namespace AyCode.Core.Benchmarks.Workloads.Scenarios;
|
|
|
|
/// <summary>
|
|
/// Same chunked-framed AsyncPipe code path as <see cref="AcBinaryNamedPipeBenchmark{T}"/>, but the transport
|
|
/// is an in-memory <see cref="System.IO.Pipelines.Pipe"/> instead of a kernel <c>NamedPipe</c>. The Pipe's
|
|
/// <c>Writer</c>/<c>Reader</c> pair is a managed-only zero-copy slab handoff — no syscalls, no kernel
|
|
/// buffer copy, no IRP queueing.
|
|
///
|
|
/// <para><b>Why this benchmark matters</b>: by holding ALL other variables constant (same SerializeChunkedFramed,
|
|
/// same AsyncPipeReaderInput, same drain task, same consumer task, same multi-message wire format), this
|
|
/// row isolates the <b>kernel-NamedPipe transport overhead</b> from the chunked-streaming framework's pure
|
|
/// CPU cost. The expected delta vs <see cref="AcBinaryNamedPipeBenchmark{T}"/>: per-chunk overhead drops from
|
|
/// ~25-30 µs (kernel-syscall pair + IRP) to ~1-2 µs (managed slab handoff). Multi-chunk Large-message rows
|
|
/// should converge dramatically toward <see cref="AcBinaryNamedPipeRawByteArrayBenchmark{T}"/>.</para>
|
|
///
|
|
/// <para><b>Real-world relevance</b>: in-memory Pipe is the typical primitive used for cross-thread serializer
|
|
/// pipelines inside a single process (e.g. SignalR's Kestrel transport adapter, gRPC framework internals,
|
|
/// custom message brokers). The numbers from this row reflect that scenario, NOT the kernel-pipe loopback
|
|
/// of the NamedPipe benchmark.</para>
|
|
/// </summary>
|
|
public sealed class AcBinaryInMemoryPipeBenchmark<T> : ISerializerBenchmark, IDisposable where T : class
|
|
{
|
|
private readonly T _order;
|
|
private readonly AcBinarySerializerOptions _options;
|
|
private readonly byte[] _serialized; // for SerializedSize reporting only
|
|
|
|
// Long-lived in-memory pipe lifecycle (set up once in ctor — NOT timed).
|
|
private readonly Pipe _pipe;
|
|
private readonly PipeWriter _pipeWriter;
|
|
private readonly PipeReader _pipeReader;
|
|
|
|
// Long-lived multi-message receive infrastructure (set up once in ctor) — same pattern as the NamedPipe
|
|
// variant: drain pumps reader into AsyncPipeReaderInput, consumer task drives Deserialize<T>(input).
|
|
private readonly AsyncPipeReaderInput _input;
|
|
private readonly CancellationTokenSource _cts;
|
|
private readonly Task _drainTask;
|
|
private readonly Task _consumerTask;
|
|
private readonly ManualResetEventSlim _consumeRequest = new(false);
|
|
private readonly ManualResetEventSlim _consumeDone = new(false);
|
|
private object? _lastResult;
|
|
private bool _captureResult;
|
|
private bool _disposed;
|
|
|
|
public BenchmarkEngine Engine => BenchmarkEngine.AcBinary;
|
|
public BenchmarkIoMode IoMode => BenchmarkIoMode.InMemoryPipe;
|
|
public BenchmarkDispatchMode DispatchMode => _options.UseGeneratedCode ? BenchmarkDispatchMode.SGen : BenchmarkDispatchMode.Runtime;
|
|
public Type OrderType => typeof(T);
|
|
public string OptionsPreset { get; }
|
|
public int SerializedSize => _serialized.Length;
|
|
public long SetupSerializeAllocBytes { get; }
|
|
public long SetupDeserializeAllocBytes { get; }
|
|
public bool IsRoundTripOnly => true;
|
|
public string OptionsDescription => BenchmarkOptions.BuildAcBinary(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=Pipe(in-memory,multiMessage,2-task)");
|
|
|
|
public AcBinaryInMemoryPipeBenchmark(T order, AcBinarySerializerOptions options, string optionsPreset)
|
|
{
|
|
_order = order;
|
|
_options = options;
|
|
OptionsPreset = optionsPreset;
|
|
|
|
_serialized = AcBinarySerializer.Serialize(order, _options);
|
|
|
|
// === SERIALIZE-side setup measurement ===
|
|
// In-memory Pipe construction. NO kernel-pipe pair, NO Connect handshake — just a managed Pipe object
|
|
// and a reference to its Writer side. PipeWriterImpl (parallel-flush capable, NOT StreamPipeWriter).
|
|
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
|
|
var beforeSer = GC.GetAllocatedBytesForCurrentThread();
|
|
_pipe = new Pipe();
|
|
_pipeWriter = _pipe.Writer;
|
|
var afterSer = GC.GetAllocatedBytesForCurrentThread();
|
|
SetupSerializeAllocBytes = afterSer - beforeSer;
|
|
|
|
// === DESERIALIZE-side setup measurement ===
|
|
// PipeReader reference + AsyncPipeReaderInput (ArrayPool rent + ManualResetEventSlim) + drain task +
|
|
// consumer task scaffolding. Identical to the NamedPipe variant on the receive side.
|
|
GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect();
|
|
var beforeDes = GC.GetAllocatedBytesForCurrentThread();
|
|
|
|
_pipeReader = _pipe.Reader;
|
|
_input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true);
|
|
_cts = new CancellationTokenSource();
|
|
_drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token));
|
|
_consumerTask = Task.Run(ConsumeLoop);
|
|
|
|
var afterDes = GC.GetAllocatedBytesForCurrentThread();
|
|
SetupDeserializeAllocBytes = afterDes - beforeDes;
|
|
}
|
|
|
|
// BG consumer: parks on _consumeRequest, runs Deserialize<T>(_input) when signaled, signals _consumeDone.
|
|
// Mirror of AcBinaryNamedPipeBenchmark.ConsumeLoop — same pattern, same MRES protocol.
|
|
private void ConsumeLoop()
|
|
{
|
|
var ct = _cts.Token;
|
|
try
|
|
{
|
|
while (true)
|
|
{
|
|
_consumeRequest.Wait(ct);
|
|
if (ct.IsCancellationRequested) return;
|
|
_consumeRequest.Reset();
|
|
|
|
try
|
|
{
|
|
var result = AcBinaryDeserializer.Deserialize<T>(_input, _options);
|
|
if (_captureResult) _lastResult = result;
|
|
}
|
|
catch
|
|
{
|
|
// Swallow — see ConsumeLoop in NamedPipe variant for rationale.
|
|
}
|
|
finally
|
|
{
|
|
_consumeDone.Set();
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Cooperative cancel — Dispose path. Swallow.
|
|
}
|
|
}
|
|
|
|
[MethodImpl(MethodImplOptions.NoInlining)]
|
|
public void Serialize()
|
|
{
|
|
// Same 2-task streaming pipeline as NamedPipe variant — only the transport differs (in-memory Pipe
|
|
// instead of kernel NamedPipe). Per-chunk SerializeChunkedFramed → PipeWriter slab → drain task
|
|
// reads from PipeReader → input.Feed → consumer Deserialize<T> consumes byte-by-byte.
|
|
//
|
|
// Uses the Pipe-overload (instead of the PipeWriter-overload) so the FlushPolicy parameter is
|
|
// exposed for tuning. Toggle between FlushPolicy.PerChunk (bounded peak memory, per-chunk await
|
|
// FlushAsync) and FlushPolicy.Coalesced (fire-and-forget per chunk, pipe-coalesced flushes up to
|
|
// PauseWriterThreshold ~64 KB) to A/B-test the streaming-pipeline overhead. FlushPolicy.PerChunk
|
|
// is functionally equivalent to the PipeWriter-overload (both internally route to
|
|
// SerializeToPipeWriterCore with FlushPolicy.PerChunk).
|
|
_consumeDone.Reset();
|
|
_consumeRequest.Set();
|
|
|
|
AcBinarySerializer.SerializeChunkedFramed(_order, _pipe, _options, FlushPolicy.Coalesced);
|
|
|
|
_consumeDone.Wait();
|
|
}
|
|
|
|
[MethodImpl(MethodImplOptions.NoInlining)]
|
|
public void Deserialize()
|
|
{
|
|
// No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract.
|
|
}
|
|
|
|
public bool VerifyRoundTrip()
|
|
{
|
|
_captureResult = true;
|
|
try
|
|
{
|
|
Serialize();
|
|
var result = _lastResult as T;
|
|
return result != null && RoundTripValidator.DeepEqualsViaJson(_order, result);
|
|
}
|
|
finally
|
|
{
|
|
_captureResult = false;
|
|
_lastResult = null;
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
|
|
// Cancel drain + consumer tasks → both exit. Pulse _consumeRequest in case consumer is parked.
|
|
try { _cts.Cancel(); } catch { /* swallow on teardown */ }
|
|
try { _consumeRequest.Set(); } catch { /* nudge in case consumer Wait is parked */ }
|
|
try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
|
|
try { _consumerTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
|
|
|
|
// Complete writer + reader (in-memory Pipe — no underlying stream to dispose).
|
|
try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
|
|
try { _pipeReader.Complete(); } catch { /* swallow on teardown */ }
|
|
try { _input.Dispose(); } catch { /* swallow on teardown */ }
|
|
try { _consumeRequest.Dispose(); } catch { /* swallow on teardown */ }
|
|
try { _consumeDone.Dispose(); } catch { /* swallow on teardown */ }
|
|
try { _cts.Dispose(); } catch { /* swallow on teardown */ }
|
|
}
|
|
}
|