[LOADED_DOCS: 3 files, no new loads]

Add NamedPipe round-trip benchmark & streaming infra

- Introduce AcBinaryNamedPipeBenchmark for long-lived NamedPipe round-trip measurement, simulating SignalR streaming.
- Add IoNamedPipe
This commit is contained in:
Loretta 2026-04-30 14:32:13 +02:00
parent 294a3e9609
commit 42b40a92c1
3 changed files with 379 additions and 31 deletions

View File

@ -7,6 +7,8 @@ using MessagePack.Resolvers;
using Microsoft.Extensions.Options;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
@ -45,6 +47,7 @@ public static class Program
private const string IoBufWrReuse = "BufWr reuse";
private const string IoBufWrNew = "BufWr new";
private const string IoString = "String";
private const string IoNamedPipe = "NamedPipe";
// Dispatch mode identifiers — describes how property access / type dispatch happens for a given run.
// SGen = compile-time source generator path (Unsafe.As<T> direct fields, slot-array wrapper lookup).
@ -156,11 +159,19 @@ public static class Program
foreach (var testData in testDataSets)
{
var preSerializers = CreateSerializers(testData);
foreach (var s in preSerializers)
try
{
// Light warmup just to trigger Tier 0 → Tier 1 promotion. The per-cell 5000-iter warmup
// inside RunBenchmarksForTestData still runs afterwards for cache/BTB warming.
s.Warmup(2000);
foreach (var s in preSerializers)
{
// Light warmup just to trigger Tier 0 → Tier 1 promotion. The per-cell 5000-iter warmup
// inside RunBenchmarksForTestData still runs afterwards for cache/BTB warming.
s.Warmup(2000);
}
}
finally
{
// Dispose any IDisposable serializers (NamedPipe / FileStream variants own OS resources).
foreach (var s in preSerializers) (s as IDisposable)?.Dispose();
}
}
// Let background tiered-JIT compilation drain before we begin measuring.
@ -285,26 +296,49 @@ public static class Program
OptionsPreset = serializer.OptionsPreset,
OptionsDescription = serializer.OptionsDescription,
SerializedSize = serializer.SerializedSize,
SetupAllocBytes = serializer.SetupAllocBytes
SetupAllocBytes = serializer.SetupAllocBytes,
IsRoundTripOnly = serializer.IsRoundTripOnly
};
if (mode is "all" or "serialize" or "ser")
if (serializer.IsRoundTripOnly)
{
result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations);
// Dedicated alloc-only sample (separate from timing samples; keeps timing pure)
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations);
// Round-trip-only benchmarks (NamedPipe etc.): measure the full pipe round-trip directly into the RT
// columns. Ser ms / SerAlloc / Des ms / DesAlloc stay 0 → display as "N/A". Allocation uses the
// process-wide measurement so the server-drain-thread allocations (e.g. server-side new byte[len])
// also show up — otherwise current-thread alloc would only count the client side and look ~halved.
if (mode is "all" or "serialize" or "ser")
{
result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations);
result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations);
}
// mode == "deserialize" alone is meaningless for a round-trip-only benchmark; skip silently.
}
if (mode is "all" or "deserialize" or "des")
else
{
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations);
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations);
if (mode is "all" or "serialize" or "ser")
{
result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations);
// Dedicated alloc-only sample (separate from timing samples; keeps timing pure)
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations);
}
if (mode is "all" or "deserialize" or "des")
{
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations);
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations);
}
// Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable).
result.RoundTripTimeMs = result.SerializeTimeMs + result.DeserializeTimeMs;
result.RoundTripAllocBytesPerOp = result.SerializeAllocBytesPerOp + result.DeserializeAllocBytesPerOp;
}
results.Add(result);
PrintResult(result);
}
// Dispose any IDisposable serializers (NamedPipe / FileStream variants own OS resources that must be released
// before the next test data builds new ones — otherwise pipes / handles leak across test cells).
foreach (var s in serializers) (s as IDisposable)?.Dispose();
return results;
}
@ -343,6 +377,14 @@ public static class Program
// internal buffer size; wire-format "chunks" only exist in AsyncPipeWriterOutput's chunked-framing mode.
new AcBinaryFreshBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"),
// AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration.
// Per-iter cost = Byte[] serialize + 4-byte length-prefix framing + pipe write/read syscall + Byte[] deserialize.
// SignalR-style approximation: persistent connection + per-message round-trip + 4 KB initial buffer
// (Kestrel slab + TCP MTU aligned). Single-process loopback, so the number is a lower bound (real
// cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown in
// Ser ms, Des ms = N/A (IsRoundTripOnly).
new AcBinaryNamedPipeBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"),
// ============================================================
// MemoryPack — three I/O modes for apples-to-apples comparison
// ============================================================
@ -407,6 +449,25 @@ public static class Program
return (after - before) / iterations;
}
/// <summary>
/// Process-wide allocation measurement — needed for round-trip-only benchmarks (NamedPipe etc.) where
/// the work happens across multiple threads. <see cref="GC.GetAllocatedBytesForCurrentThread"/> would
/// only count the caller-thread allocations, missing the server-side <c>new byte[len]</c> buffers and
/// any drain-pump-thread allocations. <see cref="GC.GetTotalAllocatedBytes"/> covers the entire process.
/// Slightly noisier than the per-thread variant (background threads / GC bookkeeping leak in), but
/// over 1000 iterations the signal dominates.
/// </summary>
private static long MeasureAllocationTotal(Action action, int iterations)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var before = GC.GetTotalAllocatedBytes(precise: true);
for (var i = 0; i < iterations; i++) action();
var after = GC.GetTotalAllocatedBytes(precise: true);
return (after - before) / iterations;
}
private static readonly JsonSerializerOptions VerifyJsonOpts = new()
{
WriteIndented = false,
@ -522,6 +583,11 @@ public static class Program
string? OptionsDescription => null;
/// <summary>One-time setup allocation cost (e.g., pre-allocated ArrayBufferWriter with internal buffer). Captured in constructor; 0 for byte[] API and Fresh-BufWriter variants.</summary>
long SetupAllocBytes { get; }
/// <summary>True when Serialize() does a full round-trip (e.g. NamedPipe) and Deserialize() is a no-op.
/// Used by the SUMMARY: WINNERS section to skip such cells from "Fastest Serialize" and "Fastest Deserialize"
/// rankings (because both metrics are misleading there) — they still participate in "Fastest Round-trip".
/// Default false for in-memory IO modes which measure Ser and Des separately.</summary>
bool IsRoundTripOnly => false;
void Warmup(int iterations);
void Serialize();
void Deserialize();
@ -745,6 +811,182 @@ public static class Program
}
}
/// <summary>
/// Benchmarks AcBinary over a long-lived NamedPipe IPC connection — pipe is set up ONCE in the constructor;
/// each iteration only sends a length-prefixed payload through the existing pipe. Closer to a real SignalR-style
/// scenario where the connection is established at process start and reused for many messages, rather than the
/// pathological one-pipe-per-message setup overhead.
///
/// <para><b>Architecture</b>:</para>
/// <list type="bullet">
/// <item>Constructor: sets up <see cref="NamedPipeServerStream"/> + <see cref="NamedPipeClientStream"/>,
/// waits for connection, starts a long-lived background drain task on the server side that reads length-prefixed
/// messages and pushes deserialized results into a <see cref="System.Threading.Channels.Channel{T}"/>.</item>
/// <item>Per-iteration <see cref="Serialize"/>: encodes the payload via the Byte[] API, writes a 4-byte length
/// prefix + payload bytes to the pipe, then awaits the channel for the server-deserialized result.</item>
/// <item><see cref="Deserialize"/> is a no-op (the round-trip happens inside Serialize); same IsRoundTripOnly contract
/// as the previous one-shot variant.</item>
/// </list>
///
/// <para><b>What this measures</b>: per-message Byte[] serialize + length-prefix framing + pipe write/read syscall +
/// kernel context switch + Byte[] deserialize. NOT measured: pipe lifecycle (one-time setup amortized over all iterations
/// and across all test data cells, since this benchmark runs against many cells).</para>
///
/// <para><b>Approximation note</b>: this is a single-process loopback pipe. Real cross-process or cross-machine SignalR
/// will add transport latency (TCP, WebSocket framing) on top of these numbers. The benchmark gives a lower bound for
/// streaming/IPC scenarios.</para>
/// </summary>
private sealed class AcBinaryNamedPipeBenchmark : ISerializerBenchmark, IDisposable
{
private readonly TestOrder _order;
private readonly AcBinarySerializerOptions _options;
private readonly byte[] _serialized; // for SerializedSize reporting
// Long-lived pipe + drain pump (set up once in ctor)
private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient;
private readonly Task _drainTask;
private readonly System.Threading.Channels.Channel<TestOrder?> _resultChannel;
private bool _disposed;
public string Engine => EngineAcBinary;
public string IoMode => IoNamedPipe;
public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime;
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public bool IsRoundTripOnly => true; // Serialize() does the full per-message round-trip; Deserialize() is a no-op
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,length-prefixed)";
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
_options = options;
// SignalR-aligned 4 KB initial buffer for the Byte[] API — matches Kestrel slab + TCP MTU,
// simulates the realistic per-message buffer profile the SignalR transport ends up with.
// (The 65535 default is fine for big batch encoding but over-allocates on small messages.)
_options.BufferWriterChunkSize = 4096;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
// 1× setup — pipe persists for the lifetime of the benchmark instance.
// Byte mode (not Message mode) — we frame messages ourselves with a 4-byte length prefix.
// PipeOptions.Asynchronous → enables overlapped I/O on Windows; harmless on Linux/macOS.
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
// Establish the connection. Server async-wait + client connect happen in parallel.
var serverWait = _pipeServer.WaitForConnectionAsync();
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
_resultChannel = System.Threading.Channels.Channel.CreateUnbounded<TestOrder?>();
// Long-lived drain loop on the server side. Reads length-prefixed messages until the pipe is closed.
_drainTask = Task.Run(async () =>
{
var lenBuf = new byte[4];
try
{
while (true)
{
// Read 4-byte length prefix (handle short reads in a loop)
if (!await ReadExactAsync(_pipeServer, lenBuf, 0, 4).ConfigureAwait(false))
break;
var len = BitConverter.ToInt32(lenBuf, 0);
if (len <= 0) break; // sentinel / corruption guard
var data = new byte[len];
if (!await ReadExactAsync(_pipeServer, data, 0, len).ConfigureAwait(false))
break;
var result = AcBinaryDeserializer.Deserialize<TestOrder>(data, _options);
await _resultChannel.Writer.WriteAsync(result).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is System.IO.IOException or ObjectDisposedException)
{
// pipe closed — normal teardown path
}
finally
{
_resultChannel.Writer.TryComplete();
}
});
}
/// <summary>Reads exactly <paramref name="count"/> bytes; returns false if pipe closed before completion.</summary>
private static async Task<bool> ReadExactAsync(System.IO.Stream stream, byte[] buffer, int offset, int count)
{
var read = 0;
while (read < count)
{
var n = await stream.ReadAsync(buffer.AsMemory(offset + read, count - read)).ConfigureAwait(false);
if (n == 0) return false; // EOF
read += n;
}
return true;
}
public void Warmup(int iterations)
{
for (var i = 0; i < iterations; i++)
{
Serialize();
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
// 1) Byte[] encode (same path as the IoByteArray benchmark)
var payload = AcBinarySerializer.Serialize(_order, _options);
// 2) Length-prefix framing (4 bytes little-endian) — pure benchmark-side framing, not an AcBinary feature.
// Stack-allocated to avoid per-iter heap traffic for the prefix.
Span<byte> lenBuf = stackalloc byte[4];
BitConverter.TryWriteBytes(lenBuf, payload.Length);
// 3) Sync write to the pipe — Stream.Write blocks until the OS accepts the bytes into the pipe buffer.
_pipeClient.Write(lenBuf);
_pipeClient.Write(payload, 0, payload.Length);
_pipeClient.Flush();
// 4) Wait for the server drain loop to deserialize and post the result. Sync wait via channel reader.
// A console app has no SynchronizationContext, so .GetAwaiter().GetResult() is deadlock-safe.
_resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
}
[MethodImpl(MethodImplOptions.NoInlining)]
public void Deserialize()
{
// No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract.
}
public bool VerifyRoundTrip()
{
// Round-trip a single message and compare structurally.
var payload = AcBinarySerializer.Serialize(_order, _options);
Span<byte> lenBuf = stackalloc byte[4];
BitConverter.TryWriteBytes(lenBuf, payload.Length);
_pipeClient.Write(lenBuf);
_pipeClient.Write(payload, 0, payload.Length);
_pipeClient.Flush();
var result = _resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
return result != null && DeepEqualsViaJson(_order, result);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Closing the client triggers EOF on the server's ReadAsync → drain loop exits gracefully.
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
try { _drainTask.Wait(TimeSpan.FromSeconds(5)); } catch { /* swallow on teardown */ }
}
}
/// <summary>
/// Benchmarks MemoryPack via the IBufferWriter overload, allocating a FRESH ArrayBufferWriter on EVERY call.
/// Apples-to-apples counterpart to AcBinaryFreshBufferWriterBenchmark.
@ -974,6 +1216,11 @@ public static class Program
public string IoMode { get; set; } = "";
public string DispatchMode { get; set; } = "";
public string OptionsPreset { get; set; } = "";
/// <summary>True if Serialize() captures a full round-trip and Deserialize() is a no-op
/// (single-use streaming transports like NamedPipe). Excluded from "Fastest Serialize" / "Fastest Deserialize"
/// winners rankings; still ranked in "Fastest Round-trip". Display-side: Ser ms / SerAlloc / Des ms / DesAlloc
/// all show "N/A" since they were never measured separately; RT ms / RT Alloc carry the full round-trip values.</summary>
public bool IsRoundTripOnly { get; set; }
/// <summary>Synthesized display name for backwards compatibility / single-string-row scenarios. Includes DispatchMode so SGen and Runtime variants of the same preset don't collide in grouping (e.g. SUMMARY: WINNERS).</summary>
public string SerializerName => $"{Engine} ({IoMode}, {OptionsPreset}, {DispatchMode})";
public string? OptionsDescription { get; set; }
@ -983,7 +1230,14 @@ public static class Program
public long SerializeAllocBytesPerOp { get; set; }
public long DeserializeAllocBytesPerOp { get; set; }
public long SetupAllocBytes { get; set; }
public double RoundTripTimeMs => SerializeTimeMs + DeserializeTimeMs;
/// <summary>Total round-trip time. For in-memory benchmarks: <c>Serialize + Deserialize</c> (set explicitly in
/// <c>RunBenchmarksForTestData</c>). For round-trip-only benchmarks (NamedPipe etc.): the directly-measured
/// pipe round-trip time, since Ser and Des are not separately measurable there.</summary>
public double RoundTripTimeMs { get; set; }
/// <summary>Total round-trip allocation per op. For in-memory benchmarks: <c>SerializeAlloc + DeserializeAlloc</c>.
/// For round-trip-only benchmarks: process-wide allocation measured via <see cref="GC.GetTotalAllocatedBytes"/>
/// (covers ALL threads — client, server-drain, channel internals — not just the caller).</summary>
public long RoundTripAllocBytesPerOp { get; set; }
}
private static void PrintResult(BenchmarkResult result)
@ -1025,9 +1279,9 @@ public static class Program
// The Runtime variant is shown alongside in the table for context, not used as the headline number.
var acBinaryResult = testResults.FirstOrDefault(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen));
System.Console.WriteLine($"\n┌─ {testData.DisplayName} ─".PadRight(159, '─') + "┐");
System.Console.WriteLine($"│ {"#",-4} │ {"Engine",-11} │ {"Options",-22} │ {"IO",-12} │ {"Mode",-8} │ {"Setup",-8} │ {"Size",-8} │ {"Ser ms",-10} │ {"SerAlloc",-10} │ {"Des ms",-10} │ {"DesAlloc",-10} │ {"RT ms",-10} │");
System.Console.WriteLine($"├{"".PadRight(6, '─')}┼{"".PadRight(13, '─')}┼{"".PadRight(24, '─')}┼{"".PadRight(14, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┤");
System.Console.WriteLine($"\n┌─ {testData.DisplayName} ─".PadRight(172, '─') + "┐");
System.Console.WriteLine($"│ {"#",-4} │ {"Engine",-11} │ {"Options",-22} │ {"IO",-12} │ {"Mode",-8} │ {"Setup",-8} │ {"Size",-8} │ {"Ser ms",-10} │ {"SerAlloc",-10} │ {"Des ms",-10} │ {"DesAlloc",-10} │ {"RT ms",-10} │ {"RT Alloc",-10} │");
System.Console.WriteLine($"├{"".PadRight(6, '─')}┼{"".PadRight(13, '─')}┼{"".PadRight(24, '─')}┼{"".PadRight(14, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┤");
var rank = 1;
foreach (var result in testResults)
@ -1039,6 +1293,7 @@ public static class Program
var rt = result.RoundTripTimeMs > 0 ? $"{result.RoundTripTimeMs:F2} ms" : "N/A";
var serAlloc = result.SerializeTimeMs > 0 ? $"{result.SerializeAllocBytesPerOp:N0} B" : "N/A";
var desAlloc = result.DeserializeTimeMs > 0 ? $"{result.DeserializeAllocBytesPerOp:N0} B" : "N/A";
var rtAlloc = result.RoundTripAllocBytesPerOp > 0 ? $"{result.RoundTripAllocBytesPerOp:N0} B" : "N/A";
// Highlight MemoryPack baseline (any Byte[]) and AcBinary headline contender (Byte[] + SGen) with win/lose colors.
// The AcBinary Byte[]+Runtime variant is shown unhighlighted — it's contextual (SGen speed-up reference), not the headline.
@ -1063,7 +1318,7 @@ public static class Program
}
}
System.Console.WriteLine($"{prefix}{rank++,4} │ {result.Engine,-11} │ {result.OptionsPreset,-22} │ {result.IoMode,-12} │ {result.DispatchMode,-8} │ {setup,8} │ {size,8} │ {ser,10} │ {serAlloc,10} │ {des,10} │ {desAlloc,10} │ {rt,10}{suffix}");
System.Console.WriteLine($"{prefix}{rank++,4} │ {result.Engine,-11} │ {result.OptionsPreset,-22} │ {result.IoMode,-12} │ {result.DispatchMode,-8} │ {setup,8} │ {size,8} │ {ser,10} │ {serAlloc,10} │ {des,10} │ {desAlloc,10} │ {rt,10} │ {rtAlloc,10}{suffix}");
if (isHighlighted)
{
@ -1080,10 +1335,11 @@ public static class Program
var rtPct = memPackResult.RoundTripTimeMs > 0 ? (acBinaryResult.RoundTripTimeMs / memPackResult.RoundTripTimeMs - 1) * 100 : 0;
var serAllocPct = memPackResult.SerializeAllocBytesPerOp > 0 ? (acBinaryResult.SerializeAllocBytesPerOp / (double)memPackResult.SerializeAllocBytesPerOp - 1) * 100 : 0;
var desAllocPct = memPackResult.DeserializeAllocBytesPerOp > 0 ? (acBinaryResult.DeserializeAllocBytesPerOp / (double)memPackResult.DeserializeAllocBytesPerOp - 1) * 100 : 0;
var rtAllocPct = memPackResult.RoundTripAllocBytesPerOp > 0 ? (acBinaryResult.RoundTripAllocBytesPerOp / (double)memPackResult.RoundTripAllocBytesPerOp - 1) * 100 : 0;
// Footer separator: merge first 5 cols (#, Engine, Options, IO, Mode) → comparison label;
// remaining 7 cols stay aligned (Setup, Size, Ser ms, SerAlloc, Des ms, DesAlloc, RT ms).
System.Console.WriteLine($"├{"".PadRight(6, '─')}┴{"".PadRight(13, '─')}┴{"".PadRight(24, '─')}┴{"".PadRight(14, '─')}┴{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┤");
// remaining 8 cols stay aligned (Setup, Size, Ser ms, SerAlloc, Des ms, DesAlloc, RT ms, RT Alloc).
System.Console.WriteLine($"├{"".PadRight(6, '─')}┴{"".PadRight(13, '─')}┴{"".PadRight(24, '─')}┴{"".PadRight(14, '─')}┴{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(10, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┼{"".PadRight(12, '─')}┤");
// Merged label cell width = 4 + 11 + 22 + 12 + 8 + 4*3 (dropped separators) = 69
System.Console.Write($"│ {" AcBinary (Byte[]) vs MemoryPack (Byte[])",-69} │ ");
@ -1125,11 +1381,17 @@ public static class Program
System.Console.ForegroundColor = rtPct <= 0 ? ConsoleColor.Green : ConsoleColor.Red;
System.Console.Write($"{rtPct,+9:+0;-0}%");
System.Console.ResetColor();
System.Console.Write(" │ ");
// Round-trip Alloc
System.Console.ForegroundColor = rtAllocPct <= 0 ? ConsoleColor.Green : ConsoleColor.Red;
System.Console.Write($"{rtAllocPct,+9:+0;-0}%");
System.Console.ResetColor();
System.Console.WriteLine(" │");
}
// Closing line: merged on left (─ between cols 1-5), ┴ on the right (cols 6-12 boundary).
System.Console.WriteLine($"└{"".PadRight(6, '─')}─{"".PadRight(13, '─')}─{"".PadRight(24, '─')}─{"".PadRight(14, '─')}─{"".PadRight(10, '─')}┴{"".PadRight(10, '─')}┴{"".PadRight(10, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┘");
// Closing line: merged on left (─ between cols 1-5), ┴ on the right (cols 6-13 boundary, 8 unmerged cells).
System.Console.WriteLine($"└{"".PadRight(6, '─')}─{"".PadRight(13, '─')}─{"".PadRight(24, '─')}─{"".PadRight(14, '─')}─{"".PadRight(10, '─')}┴{"".PadRight(10, '─')}┴{"".PadRight(10, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┴{"".PadRight(12, '─')}┘");
//System.Console.WriteLine($"GrowBufferCount: {AcBinarySerializer.GrowBufferCount}");
//System.Console.WriteLine($"GrowBufferTotalBytes: {AcBinarySerializer.GrowBufferTotalBytes:N0} bytes");
}
@ -1143,8 +1405,9 @@ public static class Program
System.Console.WriteLine($"\n{"Category",-20} │ {"Winner",-40} │ {"Avg Value",-18}");
System.Console.WriteLine($"{"".PadRight(20, '─')}─┼─{"".PadRight(40, '─')}─┼─{"".PadRight(18, '─')}");
// Fastest Serialize
var fastestSer = results.Where(r => r.SerializeTimeMs > 0)
// Fastest Serialize — round-trip-only serializers (NamedPipe etc.) excluded:
// their Serialize() captures the full round-trip and isn't comparable to a pure Ser metric.
var fastestSer = results.Where(r => r.SerializeTimeMs > 0 && !r.IsRoundTripOnly)
.GroupBy(r => r.SerializerName)
.Select(g => new { Name = g.Key, AvgTime = g.Average(r => r.SerializeTimeMs) })
.OrderBy(x => x.AvgTime)
@ -1152,8 +1415,8 @@ public static class Program
if (fastestSer != null)
System.Console.WriteLine($"{"Fastest Serialize",-20} │ {fastestSer.Name,-40} │ {fastestSer.AvgTime,15:F2} ms");
// Fastest Deserialize
var fastestDes = results.Where(r => r.DeserializeTimeMs > 0)
// Fastest Deserialize — round-trip-only serializers excluded (their Deserialize() is a no-op).
var fastestDes = results.Where(r => r.DeserializeTimeMs > 0 && !r.IsRoundTripOnly)
.GroupBy(r => r.SerializerName)
.Select(g => new { Name = g.Key, AvgTime = g.Average(r => r.DeserializeTimeMs) })
.OrderBy(x => x.AvgTime)
@ -1317,13 +1580,13 @@ public static class Program
// CSV-like data for easy import (now includes per-op allocation columns)
sb.AppendLine("=== RAW DATA (CSV) ===");
sb.AppendLine("TestData,Engine,IO,Mode,Options,Size,SerializeMs,DeserializeMs,RoundTripMs,SerializeAllocBytesPerOp,DeserializeAllocBytesPerOp,SetupAllocBytes");
sb.AppendLine("TestData,Engine,IO,Mode,Options,Size,SerializeMs,DeserializeMs,RoundTripMs,SerializeAllocBytesPerOp,DeserializeAllocBytesPerOp,RoundTripAllocBytesPerOp,SetupAllocBytes");
foreach (var testData in testDataSets)
{
var testResults = results.Where(r => r.TestDataName == testData.DisplayName).ToList();
foreach (var result in testResults)
{
sb.AppendLine($"{result.TestDataName},{result.Engine},{result.IoMode},{result.DispatchMode},{result.OptionsPreset},{result.SerializedSize},{result.SerializeTimeMs:F2},{result.DeserializeTimeMs:F2},{result.RoundTripTimeMs:F2},{result.SerializeAllocBytesPerOp},{result.DeserializeAllocBytesPerOp},{result.SetupAllocBytes}");
sb.AppendLine($"{result.TestDataName},{result.Engine},{result.IoMode},{result.DispatchMode},{result.OptionsPreset},{result.SerializedSize},{result.SerializeTimeMs:F2},{result.DeserializeTimeMs:F2},{result.RoundTripTimeMs:F2},{result.SerializeAllocBytesPerOp},{result.DeserializeAllocBytesPerOp},{result.RoundTripAllocBytesPerOp},{result.SetupAllocBytes}");
}
}
sb.AppendLine();
@ -1460,8 +1723,8 @@ public static class Program
sb.AppendLine();
sb.AppendLine("## Results");
sb.AppendLine();
sb.AppendLine("TestData | Engine | IO | Mode | Options | Size(B) | Ser(ms) | Deser(ms) | RT(ms) | SerAlloc(B/op) | DesAlloc(B/op) | SetupAlloc(B)");
sb.AppendLine("---|---|---|---|---|---|---|---|---|---|---|---");
sb.AppendLine("TestData | Engine | IO | Mode | Options | Size(B) | Ser(ms) | Deser(ms) | RT(ms) | SerAlloc(B/op) | DesAlloc(B/op) | RTAlloc(B/op) | SetupAlloc(B)");
sb.AppendLine("---|---|---|---|---|---|---|---|---|---|---|---|---");
foreach (var testData in testDataSets)
{
@ -1478,8 +1741,9 @@ public static class Program
var rt = r.RoundTripTimeMs > 0 ? r.RoundTripTimeMs.ToString("F2", inv) : "-";
var serAlloc = r.SerializeTimeMs > 0 ? r.SerializeAllocBytesPerOp.ToString(inv) : "-";
var desAlloc = r.DeserializeTimeMs > 0 ? r.DeserializeAllocBytesPerOp.ToString(inv) : "-";
var rtAlloc = r.RoundTripAllocBytesPerOp > 0 ? r.RoundTripAllocBytesPerOp.ToString(inv) : "-";
var setupAlloc = r.SetupAllocBytes.ToString(inv);
sb.AppendLine($"{r.TestDataName} | {r.Engine} | {r.IoMode} | {r.DispatchMode} | {r.OptionsPreset} | {r.SerializedSize} | {ser} | {des} | {rt} | {serAlloc} | {desAlloc} | {setupAlloc}");
sb.AppendLine($"{r.TestDataName} | {r.Engine} | {r.IoMode} | {r.DispatchMode} | {r.OptionsPreset} | {r.SerializedSize} | {ser} | {des} | {rt} | {serAlloc} | {desAlloc} | {rtAlloc} | {setupAlloc}");
}
}

File diff suppressed because one or more lines are too long

View File

@ -164,3 +164,59 @@ Add static extension methods on `AcBinarySerializerOptions` for streaming file I
**Acceptance:**
- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout.
- Full structural equality of round-tripped object.
## ACCORE-BIN-T-K3W7: Rename `BufferWriterChunkSize` to reflect actual semantics
**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API)
The property name `BufferWriterChunkSize` is misleading: across the three output paths it does NOT consistently represent a "chunk".
| Output path | What `BufferWriterChunkSize` actually controls | Wire-format chunk? |
|---|---|---|
| `ArrayBinaryOutput` (Byte[] API) | Initial buffer capacity of the internal `byte[]` | No |
| `BufferWriterBinaryOutput` (IBufferWriter overload) | Internal buffer size — how much data accumulates before `Advance()` + new `GetMemory()` on the underlying writer | No |
| `AsyncPipeWriterOutput` (streaming) | Both internal buffer **and** wire-format chunk frame size for chunked framing | **Yes** (only here) |
| Receive side (`AsyncPipeReaderInput`, `SegmentBufferReader[Input]`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) |
Only the streaming `AsyncPipeWriterOutput` path has a wire-format "chunk" concept (chunked framing for length-prefixed segments). On the other 75% of paths the property name reads as if the serializer were segmenting the payload, which is not what happens.
**Possible directions** (decide before implementing):
1. **Single rename, semantic-neutral**`BufferWriterChunkSize``BufferWriterBufferSize` or `BufferWriterPageSize`. Minimal API surface change, single-property semantics preserved. Downside: still slightly off for the streaming path where there IS chunked framing.
2. **Two-property split**`InternalBufferSize` (universal: how much data accumulates before Advance/Grow) + `StreamingChunkSize` (only meaningful for `AsyncPipeWriterOutput`; separate knob, defaults to `InternalBufferSize`). Cleanest semantics, most ceremony, slightly more options to document.
3. **Single rename, streaming-honest** — Keep as `BufferWriterChunkSize` but document explicitly that on non-streaming paths the value is repurposed as buffer size. Cheapest change (docs only). Downside: doesn't fix the underlying confusion the field name causes.
Pick one before touching code. Option 2 is the most correct but adds API surface; Option 1 is the pragmatic middle.
**Affected callers / docs to update on rename:**
- `AcBinarySerializerOptions.cs` (definition)
- `AcBinarySerializer.cs` × 3 sites (`ArrayBinaryOutput` ctor, `BufferWriterBinaryOutput` ctor, `AsyncPipeWriterOutput` ctor)
- `AcBinaryDeserializer.cs` × 1 site (receive-side initial capacity derivation)
- `AsyncPipeReaderInput.cs`, `SegmentBufferReader.cs`, `SegmentBufferReaderInput.cs` — XML doc cross-refs
- `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry, plus the streaming-doctrine invariant in `ACCORE-BIN-T-B5Y6`), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters)
- Consumer-side: `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` ctor mutates `_options.BufferWriterChunkSize = options.BufferSize;` — see `BINARY_ISSUES.md#accore-bin-i-...` (struct-mutation context). Coordinate the rename with the struct-mutation fix to avoid two cross-cutting churn waves on the same property.
**Acceptance:**
- Property renamed (or split) per the chosen direction; all internal references updated.
- XML docs reflect the actual semantics on each output path (initial capacity / advance threshold / chunk frame size — whichever applies).
- Consumer-side usage in `AcBinaryHubProtocol` updated; if Option 2 is chosen, the protocol uses `StreamingChunkSize` (the streaming knob), not the universal one.
- Wire format unchanged. Default values unchanged (65535 / equivalent).
- Migration note in CHANGELOG / release notes since this is a breaking change to `AcBinarySerializerOptions`.
## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput
**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ISSUES.md#accore-bin-i-q4t8`](BINARY_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported) — full Symptom / Root cause / Workarounds documented there; **do not duplicate here**.
Add a "next message" cursor / reset semantics so a long-lived `AsyncPipeReaderInput` can be reused across multiple `Deserialize<T>(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical pattern (`DeserializeFromPipeReaderAsync`) requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports (NamedPipe, FileStream, NetworkStream).
**Design candidates** (pick one — prototype first, measure the small-message zero-alloc claim before committing):
- **A. `Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely.
- **B. New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface.
- **C. `ResetCompletion()` for framed mode**: orthogonal to A/B — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen.
**Acceptance:**
- New tests exercise `N` consecutive `Deserialize<T>(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression).
- Existing `DeserializeFromPipeReaderAsync` unit tests continue to pass (single-message path unchanged).
- Wire format unchanged (this is consumer-side reader plumbing, not a wire-level change).
- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup** (ArrayPool rent reused across calls, no MRES per call, no `Task.Run` per call). The deserialized object graph allocations stay (those are user-visible).