From 42b40a92c1f9b7c59568ea836578d9634f9e993a Mon Sep 17 00:00:00 2001 From: Loretta Date: Thu, 30 Apr 2026 14:32:13 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Add NamedPipe round-trip benchmark & streaming infra - Introduce AcBinaryNamedPipeBenchmark for long-lived NamedPipe round-trip measurement, simulating SignalR streaming. - Add IoNamedPipe --- AyCode.Core.Serializers.Console/Program.cs | 326 +++++++++++++++++++-- AyCode.Core/docs/BINARY/BINARY_ISSUES.md | 28 ++ AyCode.Core/docs/BINARY/BINARY_TODO.md | 56 ++++ 3 files changed, 379 insertions(+), 31 deletions(-) diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index 4c7a0e2..bc5a25c 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -7,6 +7,8 @@ using MessagePack.Resolvers; using Microsoft.Extensions.Options; using System.Buffers; using System.Diagnostics; +using System.IO.Pipelines; +using System.IO.Pipes; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; @@ -45,6 +47,7 @@ public static class Program private const string IoBufWrReuse = "BufWr reuse"; private const string IoBufWrNew = "BufWr new"; private const string IoString = "String"; + private const string IoNamedPipe = "NamedPipe"; // Dispatch mode identifiers — describes how property access / type dispatch happens for a given run. // SGen = compile-time source generator path (Unsafe.As direct fields, slot-array wrapper lookup). @@ -156,11 +159,19 @@ public static class Program foreach (var testData in testDataSets) { var preSerializers = CreateSerializers(testData); - foreach (var s in preSerializers) + try { - // Light warmup just to trigger Tier 0 → Tier 1 promotion. The per-cell 5000-iter warmup - // inside RunBenchmarksForTestData still runs afterwards for cache/BTB warming. - s.Warmup(2000); + foreach (var s in preSerializers) + { + // Light warmup just to trigger Tier 0 → Tier 1 promotion. The per-cell 5000-iter warmup + // inside RunBenchmarksForTestData still runs afterwards for cache/BTB warming. + s.Warmup(2000); + } + } + finally + { + // Dispose any IDisposable serializers (NamedPipe / FileStream variants own OS resources). + foreach (var s in preSerializers) (s as IDisposable)?.Dispose(); } } // Let background tiered-JIT compilation drain before we begin measuring. @@ -285,26 +296,49 @@ public static class Program OptionsPreset = serializer.OptionsPreset, OptionsDescription = serializer.OptionsDescription, SerializedSize = serializer.SerializedSize, - SetupAllocBytes = serializer.SetupAllocBytes + SetupAllocBytes = serializer.SetupAllocBytes, + IsRoundTripOnly = serializer.IsRoundTripOnly }; - if (mode is "all" or "serialize" or "ser") + if (serializer.IsRoundTripOnly) { - result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations); - // Dedicated alloc-only sample (separate from timing samples; keeps timing pure) - result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations); + // Round-trip-only benchmarks (NamedPipe etc.): measure the full pipe round-trip directly into the RT + // columns. Ser ms / SerAlloc / Des ms / DesAlloc stay 0 → display as "N/A". Allocation uses the + // process-wide measurement so the server-drain-thread allocations (e.g. server-side new byte[len]) + // also show up — otherwise current-thread alloc would only count the client side and look ~halved. + if (mode is "all" or "serialize" or "ser") + { + result.RoundTripTimeMs = RunTimed(() => serializer.Serialize(), TestIterations); + result.RoundTripAllocBytesPerOp = MeasureAllocationTotal(() => serializer.Serialize(), TestIterations); + } + // mode == "deserialize" alone is meaningless for a round-trip-only benchmark; skip silently. } - - if (mode is "all" or "deserialize" or "des") + else { - result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations); - result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations); + if (mode is "all" or "serialize" or "ser") + { + result.SerializeTimeMs = RunTimed(() => serializer.Serialize(), TestIterations); + // Dedicated alloc-only sample (separate from timing samples; keeps timing pure) + result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations); + } + if (mode is "all" or "deserialize" or "des") + { + result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations); + result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations); + } + // Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable). + result.RoundTripTimeMs = result.SerializeTimeMs + result.DeserializeTimeMs; + result.RoundTripAllocBytesPerOp = result.SerializeAllocBytesPerOp + result.DeserializeAllocBytesPerOp; } results.Add(result); PrintResult(result); } + // Dispose any IDisposable serializers (NamedPipe / FileStream variants own OS resources that must be released + // before the next test data builds new ones — otherwise pipes / handles leak across test cells). + foreach (var s in serializers) (s as IDisposable)?.Dispose(); + return results; } @@ -343,6 +377,14 @@ public static class Program // internal buffer size; wire-format "chunks" only exist in AsyncPipeWriterOutput's chunked-framing mode. new AcBinaryFreshBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"), + // AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration. + // Per-iter cost = Byte[] serialize + 4-byte length-prefix framing + pipe write/read syscall + Byte[] deserialize. + // SignalR-style approximation: persistent connection + per-message round-trip + 4 KB initial buffer + // (Kestrel slab + TCP MTU aligned). Single-process loopback, so the number is a lower bound (real + // cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown in + // Ser ms, Des ms = N/A (IsRoundTripOnly). + new AcBinaryNamedPipeBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"), + // ============================================================ // MemoryPack — three I/O modes for apples-to-apples comparison // ============================================================ @@ -407,6 +449,25 @@ public static class Program return (after - before) / iterations; } + /// + /// Process-wide allocation measurement — needed for round-trip-only benchmarks (NamedPipe etc.) where + /// the work happens across multiple threads. would + /// only count the caller-thread allocations, missing the server-side new byte[len] buffers and + /// any drain-pump-thread allocations. covers the entire process. + /// Slightly noisier than the per-thread variant (background threads / GC bookkeeping leak in), but + /// over 1000 iterations the signal dominates. + /// + private static long MeasureAllocationTotal(Action action, int iterations) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + var before = GC.GetTotalAllocatedBytes(precise: true); + for (var i = 0; i < iterations; i++) action(); + var after = GC.GetTotalAllocatedBytes(precise: true); + return (after - before) / iterations; + } + private static readonly JsonSerializerOptions VerifyJsonOpts = new() { WriteIndented = false, @@ -522,6 +583,11 @@ public static class Program string? OptionsDescription => null; /// One-time setup allocation cost (e.g., pre-allocated ArrayBufferWriter with internal buffer). Captured in constructor; 0 for byte[] API and Fresh-BufWriter variants. long SetupAllocBytes { get; } + /// True when Serialize() does a full round-trip (e.g. NamedPipe) and Deserialize() is a no-op. + /// Used by the SUMMARY: WINNERS section to skip such cells from "Fastest Serialize" and "Fastest Deserialize" + /// rankings (because both metrics are misleading there) — they still participate in "Fastest Round-trip". + /// Default false for in-memory IO modes which measure Ser and Des separately. + bool IsRoundTripOnly => false; void Warmup(int iterations); void Serialize(); void Deserialize(); @@ -745,6 +811,182 @@ public static class Program } } + /// + /// Benchmarks AcBinary over a long-lived NamedPipe IPC connection — pipe is set up ONCE in the constructor; + /// each iteration only sends a length-prefixed payload through the existing pipe. Closer to a real SignalR-style + /// scenario where the connection is established at process start and reused for many messages, rather than the + /// pathological one-pipe-per-message setup overhead. + /// + /// Architecture: + /// + /// Constructor: sets up + , + /// waits for connection, starts a long-lived background drain task on the server side that reads length-prefixed + /// messages and pushes deserialized results into a . + /// Per-iteration : encodes the payload via the Byte[] API, writes a 4-byte length + /// prefix + payload bytes to the pipe, then awaits the channel for the server-deserialized result. + /// is a no-op (the round-trip happens inside Serialize); same IsRoundTripOnly contract + /// as the previous one-shot variant. + /// + /// + /// What this measures: per-message Byte[] serialize + length-prefix framing + pipe write/read syscall + + /// kernel context switch + Byte[] deserialize. NOT measured: pipe lifecycle (one-time setup amortized over all iterations + /// and across all test data cells, since this benchmark runs against many cells). + /// + /// Approximation note: this is a single-process loopback pipe. Real cross-process or cross-machine SignalR + /// will add transport latency (TCP, WebSocket framing) on top of these numbers. The benchmark gives a lower bound for + /// streaming/IPC scenarios. + /// + private sealed class AcBinaryNamedPipeBenchmark : ISerializerBenchmark, IDisposable + { + private readonly TestOrder _order; + private readonly AcBinarySerializerOptions _options; + private readonly byte[] _serialized; // for SerializedSize reporting + + // Long-lived pipe + drain pump (set up once in ctor) + private readonly NamedPipeServerStream _pipeServer; + private readonly NamedPipeClientStream _pipeClient; + private readonly Task _drainTask; + private readonly System.Threading.Channels.Channel _resultChannel; + private bool _disposed; + + public string Engine => EngineAcBinary; + public string IoMode => IoNamedPipe; + public string DispatchMode => _options.UseGeneratedCode ? ModeSGen : ModeRuntime; + public string OptionsPreset { get; } + public int SerializedSize => _serialized.Length; + public long SetupAllocBytes => 0; + public bool IsRoundTripOnly => true; // Serialize() does the full per-message round-trip; Deserialize() is a no-op + public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,length-prefixed)"; + + public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) + { + _order = order; + _options = options; + // SignalR-aligned 4 KB initial buffer for the Byte[] API — matches Kestrel slab + TCP MTU, + // simulates the realistic per-message buffer profile the SignalR transport ends up with. + // (The 65535 default is fine for big batch encoding but over-allocates on small messages.) + _options.BufferWriterChunkSize = 4096; + OptionsPreset = optionsPreset; + _serialized = AcBinarySerializer.Serialize(order, _options); + + // 1× setup — pipe persists for the lifetime of the benchmark instance. + // Byte mode (not Message mode) — we frame messages ourselves with a 4-byte length prefix. + // PipeOptions.Asynchronous → enables overlapped I/O on Windows; harmless on Linux/macOS. + var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}"; + _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); + _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); + + // Establish the connection. Server async-wait + client connect happen in parallel. + var serverWait = _pipeServer.WaitForConnectionAsync(); + _pipeClient.Connect(); + serverWait.GetAwaiter().GetResult(); + + _resultChannel = System.Threading.Channels.Channel.CreateUnbounded(); + + // Long-lived drain loop on the server side. Reads length-prefixed messages until the pipe is closed. + _drainTask = Task.Run(async () => + { + var lenBuf = new byte[4]; + try + { + while (true) + { + // Read 4-byte length prefix (handle short reads in a loop) + if (!await ReadExactAsync(_pipeServer, lenBuf, 0, 4).ConfigureAwait(false)) + break; + var len = BitConverter.ToInt32(lenBuf, 0); + if (len <= 0) break; // sentinel / corruption guard + var data = new byte[len]; + if (!await ReadExactAsync(_pipeServer, data, 0, len).ConfigureAwait(false)) + break; + + var result = AcBinaryDeserializer.Deserialize(data, _options); + await _resultChannel.Writer.WriteAsync(result).ConfigureAwait(false); + } + } + catch (Exception ex) when (ex is System.IO.IOException or ObjectDisposedException) + { + // pipe closed — normal teardown path + } + finally + { + _resultChannel.Writer.TryComplete(); + } + }); + } + + /// Reads exactly bytes; returns false if pipe closed before completion. + private static async Task ReadExactAsync(System.IO.Stream stream, byte[] buffer, int offset, int count) + { + var read = 0; + while (read < count) + { + var n = await stream.ReadAsync(buffer.AsMemory(offset + read, count - read)).ConfigureAwait(false); + if (n == 0) return false; // EOF + read += n; + } + return true; + } + + public void Warmup(int iterations) + { + for (var i = 0; i < iterations; i++) + { + Serialize(); + } + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Serialize() + { + // 1) Byte[] encode (same path as the IoByteArray benchmark) + var payload = AcBinarySerializer.Serialize(_order, _options); + + // 2) Length-prefix framing (4 bytes little-endian) — pure benchmark-side framing, not an AcBinary feature. + // Stack-allocated to avoid per-iter heap traffic for the prefix. + Span lenBuf = stackalloc byte[4]; + BitConverter.TryWriteBytes(lenBuf, payload.Length); + + // 3) Sync write to the pipe — Stream.Write blocks until the OS accepts the bytes into the pipe buffer. + _pipeClient.Write(lenBuf); + _pipeClient.Write(payload, 0, payload.Length); + _pipeClient.Flush(); + + // 4) Wait for the server drain loop to deserialize and post the result. Sync wait via channel reader. + // A console app has no SynchronizationContext, so .GetAwaiter().GetResult() is deadlock-safe. + _resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult(); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void Deserialize() + { + // No-op: per-iter round-trip is captured in Serialize(). See IsRoundTripOnly contract. + } + + public bool VerifyRoundTrip() + { + // Round-trip a single message and compare structurally. + var payload = AcBinarySerializer.Serialize(_order, _options); + Span lenBuf = stackalloc byte[4]; + BitConverter.TryWriteBytes(lenBuf, payload.Length); + _pipeClient.Write(lenBuf); + _pipeClient.Write(payload, 0, payload.Length); + _pipeClient.Flush(); + var result = _resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult(); + return result != null && DeepEqualsViaJson(_order, result); + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + // Closing the client triggers EOF on the server's ReadAsync → drain loop exits gracefully. + try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ } + try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ } + try { _drainTask.Wait(TimeSpan.FromSeconds(5)); } catch { /* swallow on teardown */ } + } + } + /// /// Benchmarks MemoryPack via the IBufferWriter overload, allocating a FRESH ArrayBufferWriter on EVERY call. /// Apples-to-apples counterpart to AcBinaryFreshBufferWriterBenchmark. @@ -974,6 +1216,11 @@ public static class Program public string IoMode { get; set; } = ""; public string DispatchMode { get; set; } = ""; public string OptionsPreset { get; set; } = ""; + /// True if Serialize() captures a full round-trip and Deserialize() is a no-op + /// (single-use streaming transports like NamedPipe). Excluded from "Fastest Serialize" / "Fastest Deserialize" + /// winners rankings; still ranked in "Fastest Round-trip". Display-side: Ser ms / SerAlloc / Des ms / DesAlloc + /// all show "N/A" since they were never measured separately; RT ms / RT Alloc carry the full round-trip values. + public bool IsRoundTripOnly { get; set; } /// Synthesized display name for backwards compatibility / single-string-row scenarios. Includes DispatchMode so SGen and Runtime variants of the same preset don't collide in grouping (e.g. SUMMARY: WINNERS). public string SerializerName => $"{Engine} ({IoMode}, {OptionsPreset}, {DispatchMode})"; public string? OptionsDescription { get; set; } @@ -983,7 +1230,14 @@ public static class Program public long SerializeAllocBytesPerOp { get; set; } public long DeserializeAllocBytesPerOp { get; set; } public long SetupAllocBytes { get; set; } - public double RoundTripTimeMs => SerializeTimeMs + DeserializeTimeMs; + /// Total round-trip time. For in-memory benchmarks: Serialize + Deserialize (set explicitly in + /// RunBenchmarksForTestData). For round-trip-only benchmarks (NamedPipe etc.): the directly-measured + /// pipe round-trip time, since Ser and Des are not separately measurable there. + public double RoundTripTimeMs { get; set; } + /// Total round-trip allocation per op. For in-memory benchmarks: SerializeAlloc + DeserializeAlloc. + /// For round-trip-only benchmarks: process-wide allocation measured via + /// (covers ALL threads — client, server-drain, channel internals — not just the caller). + public long RoundTripAllocBytesPerOp { get; set; } } private static void PrintResult(BenchmarkResult result) @@ -1025,9 +1279,9 @@ public static class Program // The Runtime variant is shown alongside in the table for context, not used as the headline number. var acBinaryResult = testResults.FirstOrDefault(r => (r.Engine == EngineAcBinary && r.IoMode == IoByteArray && r.DispatchMode == ModeSGen)); - System.Console.WriteLine($"\n┌─ {testData.DisplayName} ─".PadRight(159, '─') + "┐"); - System.Console.WriteLine($"│ {"#",-4} │ {"Engine",-11} │ {"Options",-22} │ {"IO",-12} │ {"Mode",-8} │ {"Setup",-8} │ {"Size",-8} │ {"Ser ms",-10} │ {"SerAlloc",-10} │ {"Des ms",-10} │ {"DesAlloc",-10} │ {"RT ms",-10} │"); - System.Console.WriteLine($"├{"─".PadRight(6, '─')}┼{"─".PadRight(13, '─')}┼{"─".PadRight(24, '─')}┼{"─".PadRight(14, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┤"); + System.Console.WriteLine($"\n┌─ {testData.DisplayName} ─".PadRight(172, '─') + "┐"); + System.Console.WriteLine($"│ {"#",-4} │ {"Engine",-11} │ {"Options",-22} │ {"IO",-12} │ {"Mode",-8} │ {"Setup",-8} │ {"Size",-8} │ {"Ser ms",-10} │ {"SerAlloc",-10} │ {"Des ms",-10} │ {"DesAlloc",-10} │ {"RT ms",-10} │ {"RT Alloc",-10} │"); + System.Console.WriteLine($"├{"─".PadRight(6, '─')}┼{"─".PadRight(13, '─')}┼{"─".PadRight(24, '─')}┼{"─".PadRight(14, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┤"); var rank = 1; foreach (var result in testResults) @@ -1039,6 +1293,7 @@ public static class Program var rt = result.RoundTripTimeMs > 0 ? $"{result.RoundTripTimeMs:F2} ms" : "N/A"; var serAlloc = result.SerializeTimeMs > 0 ? $"{result.SerializeAllocBytesPerOp:N0} B" : "N/A"; var desAlloc = result.DeserializeTimeMs > 0 ? $"{result.DeserializeAllocBytesPerOp:N0} B" : "N/A"; + var rtAlloc = result.RoundTripAllocBytesPerOp > 0 ? $"{result.RoundTripAllocBytesPerOp:N0} B" : "N/A"; // Highlight MemoryPack baseline (any Byte[]) and AcBinary headline contender (Byte[] + SGen) with win/lose colors. // The AcBinary Byte[]+Runtime variant is shown unhighlighted — it's contextual (SGen speed-up reference), not the headline. @@ -1063,7 +1318,7 @@ public static class Program } } - System.Console.WriteLine($"{prefix}{rank++,4} │ {result.Engine,-11} │ {result.OptionsPreset,-22} │ {result.IoMode,-12} │ {result.DispatchMode,-8} │ {setup,8} │ {size,8} │ {ser,10} │ {serAlloc,10} │ {des,10} │ {desAlloc,10} │ {rt,10}{suffix}"); + System.Console.WriteLine($"{prefix}{rank++,4} │ {result.Engine,-11} │ {result.OptionsPreset,-22} │ {result.IoMode,-12} │ {result.DispatchMode,-8} │ {setup,8} │ {size,8} │ {ser,10} │ {serAlloc,10} │ {des,10} │ {desAlloc,10} │ {rt,10} │ {rtAlloc,10}{suffix}"); if (isHighlighted) { @@ -1080,10 +1335,11 @@ public static class Program var rtPct = memPackResult.RoundTripTimeMs > 0 ? (acBinaryResult.RoundTripTimeMs / memPackResult.RoundTripTimeMs - 1) * 100 : 0; var serAllocPct = memPackResult.SerializeAllocBytesPerOp > 0 ? (acBinaryResult.SerializeAllocBytesPerOp / (double)memPackResult.SerializeAllocBytesPerOp - 1) * 100 : 0; var desAllocPct = memPackResult.DeserializeAllocBytesPerOp > 0 ? (acBinaryResult.DeserializeAllocBytesPerOp / (double)memPackResult.DeserializeAllocBytesPerOp - 1) * 100 : 0; + var rtAllocPct = memPackResult.RoundTripAllocBytesPerOp > 0 ? (acBinaryResult.RoundTripAllocBytesPerOp / (double)memPackResult.RoundTripAllocBytesPerOp - 1) * 100 : 0; // Footer separator: merge first 5 cols (#, Engine, Options, IO, Mode) → comparison label; - // remaining 7 cols stay aligned (Setup, Size, Ser ms, SerAlloc, Des ms, DesAlloc, RT ms). - System.Console.WriteLine($"├{"─".PadRight(6, '─')}┴{"─".PadRight(13, '─')}┴{"─".PadRight(24, '─')}┴{"─".PadRight(14, '─')}┴{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┤"); + // remaining 8 cols stay aligned (Setup, Size, Ser ms, SerAlloc, Des ms, DesAlloc, RT ms, RT Alloc). + System.Console.WriteLine($"├{"─".PadRight(6, '─')}┴{"─".PadRight(13, '─')}┴{"─".PadRight(24, '─')}┴{"─".PadRight(14, '─')}┴{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(10, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┼{"─".PadRight(12, '─')}┤"); // Merged label cell width = 4 + 11 + 22 + 12 + 8 + 4*3 (dropped separators) = 69 System.Console.Write($"│ {"► AcBinary (Byte[]) vs MemoryPack (Byte[])",-69} │ "); @@ -1125,11 +1381,17 @@ public static class Program System.Console.ForegroundColor = rtPct <= 0 ? ConsoleColor.Green : ConsoleColor.Red; System.Console.Write($"{rtPct,+9:+0;-0}%"); System.Console.ResetColor(); + System.Console.Write(" │ "); + + // Round-trip Alloc + System.Console.ForegroundColor = rtAllocPct <= 0 ? ConsoleColor.Green : ConsoleColor.Red; + System.Console.Write($"{rtAllocPct,+9:+0;-0}%"); + System.Console.ResetColor(); System.Console.WriteLine(" │"); } - // Closing line: merged on left (─ between cols 1-5), ┴ on the right (cols 6-12 boundary). - System.Console.WriteLine($"└{"─".PadRight(6, '─')}─{"─".PadRight(13, '─')}─{"─".PadRight(24, '─')}─{"─".PadRight(14, '─')}─{"─".PadRight(10, '─')}┴{"─".PadRight(10, '─')}┴{"─".PadRight(10, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┘"); + // Closing line: merged on left (─ between cols 1-5), ┴ on the right (cols 6-13 boundary, 8 unmerged cells). + System.Console.WriteLine($"└{"─".PadRight(6, '─')}─{"─".PadRight(13, '─')}─{"─".PadRight(24, '─')}─{"─".PadRight(14, '─')}─{"─".PadRight(10, '─')}┴{"─".PadRight(10, '─')}┴{"─".PadRight(10, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┴{"─".PadRight(12, '─')}┘"); //System.Console.WriteLine($"GrowBufferCount: {AcBinarySerializer.GrowBufferCount}"); //System.Console.WriteLine($"GrowBufferTotalBytes: {AcBinarySerializer.GrowBufferTotalBytes:N0} bytes"); } @@ -1143,8 +1405,9 @@ public static class Program System.Console.WriteLine($"\n{"Category",-20} │ {"Winner",-40} │ {"Avg Value",-18}"); System.Console.WriteLine($"{"─".PadRight(20, '─')}─┼─{"─".PadRight(40, '─')}─┼─{"─".PadRight(18, '─')}"); - // Fastest Serialize - var fastestSer = results.Where(r => r.SerializeTimeMs > 0) + // Fastest Serialize — round-trip-only serializers (NamedPipe etc.) excluded: + // their Serialize() captures the full round-trip and isn't comparable to a pure Ser metric. + var fastestSer = results.Where(r => r.SerializeTimeMs > 0 && !r.IsRoundTripOnly) .GroupBy(r => r.SerializerName) .Select(g => new { Name = g.Key, AvgTime = g.Average(r => r.SerializeTimeMs) }) .OrderBy(x => x.AvgTime) @@ -1152,8 +1415,8 @@ public static class Program if (fastestSer != null) System.Console.WriteLine($"{"Fastest Serialize",-20} │ {fastestSer.Name,-40} │ {fastestSer.AvgTime,15:F2} ms"); - // Fastest Deserialize - var fastestDes = results.Where(r => r.DeserializeTimeMs > 0) + // Fastest Deserialize — round-trip-only serializers excluded (their Deserialize() is a no-op). + var fastestDes = results.Where(r => r.DeserializeTimeMs > 0 && !r.IsRoundTripOnly) .GroupBy(r => r.SerializerName) .Select(g => new { Name = g.Key, AvgTime = g.Average(r => r.DeserializeTimeMs) }) .OrderBy(x => x.AvgTime) @@ -1317,13 +1580,13 @@ public static class Program // CSV-like data for easy import (now includes per-op allocation columns) sb.AppendLine("=== RAW DATA (CSV) ==="); - sb.AppendLine("TestData,Engine,IO,Mode,Options,Size,SerializeMs,DeserializeMs,RoundTripMs,SerializeAllocBytesPerOp,DeserializeAllocBytesPerOp,SetupAllocBytes"); + sb.AppendLine("TestData,Engine,IO,Mode,Options,Size,SerializeMs,DeserializeMs,RoundTripMs,SerializeAllocBytesPerOp,DeserializeAllocBytesPerOp,RoundTripAllocBytesPerOp,SetupAllocBytes"); foreach (var testData in testDataSets) { var testResults = results.Where(r => r.TestDataName == testData.DisplayName).ToList(); foreach (var result in testResults) { - sb.AppendLine($"{result.TestDataName},{result.Engine},{result.IoMode},{result.DispatchMode},{result.OptionsPreset},{result.SerializedSize},{result.SerializeTimeMs:F2},{result.DeserializeTimeMs:F2},{result.RoundTripTimeMs:F2},{result.SerializeAllocBytesPerOp},{result.DeserializeAllocBytesPerOp},{result.SetupAllocBytes}"); + sb.AppendLine($"{result.TestDataName},{result.Engine},{result.IoMode},{result.DispatchMode},{result.OptionsPreset},{result.SerializedSize},{result.SerializeTimeMs:F2},{result.DeserializeTimeMs:F2},{result.RoundTripTimeMs:F2},{result.SerializeAllocBytesPerOp},{result.DeserializeAllocBytesPerOp},{result.RoundTripAllocBytesPerOp},{result.SetupAllocBytes}"); } } sb.AppendLine(); @@ -1460,8 +1723,8 @@ public static class Program sb.AppendLine(); sb.AppendLine("## Results"); sb.AppendLine(); - sb.AppendLine("TestData | Engine | IO | Mode | Options | Size(B) | Ser(ms) | Deser(ms) | RT(ms) | SerAlloc(B/op) | DesAlloc(B/op) | SetupAlloc(B)"); - sb.AppendLine("---|---|---|---|---|---|---|---|---|---|---|---"); + sb.AppendLine("TestData | Engine | IO | Mode | Options | Size(B) | Ser(ms) | Deser(ms) | RT(ms) | SerAlloc(B/op) | DesAlloc(B/op) | RTAlloc(B/op) | SetupAlloc(B)"); + sb.AppendLine("---|---|---|---|---|---|---|---|---|---|---|---|---"); foreach (var testData in testDataSets) { @@ -1478,8 +1741,9 @@ public static class Program var rt = r.RoundTripTimeMs > 0 ? r.RoundTripTimeMs.ToString("F2", inv) : "-"; var serAlloc = r.SerializeTimeMs > 0 ? r.SerializeAllocBytesPerOp.ToString(inv) : "-"; var desAlloc = r.DeserializeTimeMs > 0 ? r.DeserializeAllocBytesPerOp.ToString(inv) : "-"; + var rtAlloc = r.RoundTripAllocBytesPerOp > 0 ? r.RoundTripAllocBytesPerOp.ToString(inv) : "-"; var setupAlloc = r.SetupAllocBytes.ToString(inv); - sb.AppendLine($"{r.TestDataName} | {r.Engine} | {r.IoMode} | {r.DispatchMode} | {r.OptionsPreset} | {r.SerializedSize} | {ser} | {des} | {rt} | {serAlloc} | {desAlloc} | {setupAlloc}"); + sb.AppendLine($"{r.TestDataName} | {r.Engine} | {r.IoMode} | {r.DispatchMode} | {r.OptionsPreset} | {r.SerializedSize} | {ser} | {des} | {rt} | {serAlloc} | {desAlloc} | {rtAlloc} | {setupAlloc}"); } } diff --git a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md index fa07ee6..dbdf005 100644 --- a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md +++ b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md @@ -39,6 +39,34 @@ The scratch buffer is `ArrayPool.Rent`-ed on first cross-boundary read and reuse **Possible optimization:** Span-by-span UTF-8 decode for cross-boundary strings (like MessagePack). Low priority — most strings are shorter than a segment (4KB). +### ACCORE-BIN-I-Q4T8: AsyncPipeReaderInput multi-message reuse not supported + +**Status:** Open +**Affects:** `AsyncPipeReaderInput` (both raw mode `stripChunkFraming: false` and framed mode) +**Reach:** any long-lived pipe transport (NamedPipe, FileStream, NetworkStream) that needs to stream multiple AcBinary messages over a single connection without setting up a fresh input per message. + +**Symptom:** Calling `AcBinaryDeserializer.Deserialize(input, opts)` repeatedly on the same long-lived `AsyncPipeReaderInput` instance silently corrupts data on the second and subsequent calls. In raw mode the second call re-reads the FIRST message's bytes from buffer position 0; in framed mode the `[202]` CHUNK_END marker leaves `_completed = true` permanently, breaking any further reads. + +**Root cause:** + +1. `AsyncPipeReaderInput.Initialize(out buffer, out position, out bufferLength)` always emits `position = 0` — no "consumed offset" / "next-message-start" cursor is preserved between calls. + +2. The sliding-window reset-to-0 in `AppendToBuffer` (`if (rp > 0 && rp == _writePos) → reset both to 0`) only fires when the consumer has caught up via `TryAdvanceSegment`. For small messages that fit entirely inside the initial buffer capacity (4-8 KB), `TryAdvanceSegment` is **never** called during deserialization — so `_readPos` stays at 0, the reset never fires, and a second `Deserialize(input)` call starts reading the same buffer from offset 0 again (= duplicate of message #1). + +3. `Complete()` (called explicitly OR implicitly via the `[202]` CHUNK_END marker in framed mode) sets `_completed = true` irreversibly. There is no Reset / Reopen method. + +**Workarounds:** + +- **Per-message fresh `AsyncPipeReaderInput` instance** (the canonical pattern, used internally by `DeserializeFromPipeReaderAsync`): `using var input = new AsyncPipeReaderInput(...);` + drain Task + deserialization Task per message. Cost: one ArrayPool rent + one `ManualResetEventSlim` allocation per message + two `Task.Run` per message. Acceptable for typical IPC consumers but precludes any zero-alloc-per-message streaming on long-lived raw transports. + +- **Multiplexed wire format** (`SerializeChunkedFramed` + `AsyncPipeReaderInput(stripChunkFraming: true)`): a parser layer above (e.g. `AcBinaryHubProtocol.TryParseChunkData`) parses `[201][UINT16][data]` frames and feeds individual messages into per-message inputs. SignalR's `BinaryProtocolMode.AsyncSegment` works this way. The `[202]`-end-of-message rule still applies, so each message logically needs its own input — but the parser can keep the wire-level reader long-lived. Implies pipeline parallelism (multiple in-flight messages), which may not match every consumer's needs. + +**Cross-references:** +- ADR `docs/adr/0003-acbinary-streaming-receive-architecture.md` — origin of the streaming-receive architecture +- `AcBinaryDeserializer.DeserializeFromPipeReaderAsync` — reference single-message usage pattern +- `AsyncPipeReaderInput.Initialize` / `TryAdvanceSegment` / `AppendToBuffer` — the three sites that together enforce single-use semantics +- Tracked fix: [`BINARY_TODO.md#accore-bin-t-r5k2`](BINARY_TODO.md#accore-bin-t-r5k2-multi-message-reuse-for-asyncpipereaderinput) + ## Serialization ### ACCORE-BIN-I-K8R4: BufferWriterBinaryOutput fallback path allocates per-chunk diff --git a/AyCode.Core/docs/BINARY/BINARY_TODO.md b/AyCode.Core/docs/BINARY/BINARY_TODO.md index 3525f47..c8b1d09 100644 --- a/AyCode.Core/docs/BINARY/BINARY_TODO.md +++ b/AyCode.Core/docs/BINARY/BINARY_TODO.md @@ -164,3 +164,59 @@ Add static extension methods on `AcBinarySerializerOptions` for streaming file I **Acceptance:** - Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout. - Full structural equality of round-tripped object. + +## ACCORE-BIN-T-K3W7: Rename `BufferWriterChunkSize` to reflect actual semantics +**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API) + +The property name `BufferWriterChunkSize` is misleading: across the three output paths it does NOT consistently represent a "chunk". + +| Output path | What `BufferWriterChunkSize` actually controls | Wire-format chunk? | +|---|---|---| +| `ArrayBinaryOutput` (Byte[] API) | Initial buffer capacity of the internal `byte[]` | No | +| `BufferWriterBinaryOutput` (IBufferWriter overload) | Internal buffer size — how much data accumulates before `Advance()` + new `GetMemory()` on the underlying writer | No | +| `AsyncPipeWriterOutput` (streaming) | Both internal buffer **and** wire-format chunk frame size for chunked framing | **Yes** (only here) | +| Receive side (`AsyncPipeReaderInput`, `SegmentBufferReader[Input]`) | Initial receive buffer = `BufferWriterChunkSize × 2` | No (just sizing hint) | + +Only the streaming `AsyncPipeWriterOutput` path has a wire-format "chunk" concept (chunked framing for length-prefixed segments). On the other 75% of paths the property name reads as if the serializer were segmenting the payload, which is not what happens. + +**Possible directions** (decide before implementing): + +1. **Single rename, semantic-neutral** — `BufferWriterChunkSize` → `BufferWriterBufferSize` or `BufferWriterPageSize`. Minimal API surface change, single-property semantics preserved. Downside: still slightly off for the streaming path where there IS chunked framing. +2. **Two-property split** — `InternalBufferSize` (universal: how much data accumulates before Advance/Grow) + `StreamingChunkSize` (only meaningful for `AsyncPipeWriterOutput`; separate knob, defaults to `InternalBufferSize`). Cleanest semantics, most ceremony, slightly more options to document. +3. **Single rename, streaming-honest** — Keep as `BufferWriterChunkSize` but document explicitly that on non-streaming paths the value is repurposed as buffer size. Cheapest change (docs only). Downside: doesn't fix the underlying confusion the field name causes. + +Pick one before touching code. Option 2 is the most correct but adds API surface; Option 1 is the pragmatic middle. + +**Affected callers / docs to update on rename:** +- `AcBinarySerializerOptions.cs` (definition) +- `AcBinarySerializer.cs` × 3 sites (`ArrayBinaryOutput` ctor, `BufferWriterBinaryOutput` ctor, `AsyncPipeWriterOutput` ctor) +- `AcBinaryDeserializer.cs` × 1 site (receive-side initial capacity derivation) +- `AsyncPipeReaderInput.cs`, `SegmentBufferReader.cs`, `SegmentBufferReaderInput.cs` — XML doc cross-refs +- `BINARY_WRITERS.md`, `BINARY_TODO.md` (this entry, plus the streaming-doctrine invariant in `ACCORE-BIN-T-B5Y6`), `BINARY_ISSUES.md` (line 151 — already lists `BufferWriterChunkSize` among the struct-mutation issue's affected setters) +- Consumer-side: `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` ctor mutates `_options.BufferWriterChunkSize = options.BufferSize;` — see `BINARY_ISSUES.md#accore-bin-i-...` (struct-mutation context). Coordinate the rename with the struct-mutation fix to avoid two cross-cutting churn waves on the same property. + +**Acceptance:** +- Property renamed (or split) per the chosen direction; all internal references updated. +- XML docs reflect the actual semantics on each output path (initial capacity / advance threshold / chunk frame size — whichever applies). +- Consumer-side usage in `AcBinaryHubProtocol` updated; if Option 2 is chosen, the protocol uses `StreamingChunkSize` (the streaming knob), not the universal one. +- Wire format unchanged. Default values unchanged (65535 / equivalent). +- Migration note in CHANGELOG / release notes since this is a breaking change to `AcBinarySerializerOptions`. + +## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput +**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ISSUES.md#accore-bin-i-q4t8`](BINARY_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported) — full Symptom / Root cause / Workarounds documented there; **do not duplicate here**. + +Add a "next message" cursor / reset semantics so a long-lived `AsyncPipeReaderInput` can be reused across multiple `Deserialize(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical pattern (`DeserializeFromPipeReaderAsync`) requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports (NamedPipe, FileStream, NetworkStream). + +**Design candidates** (pick one — prototype first, measure the small-message zero-alloc claim before committing): + +- **A. `Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely. + +- **B. New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface. + +- **C. `ResetCompletion()` for framed mode**: orthogonal to A/B — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen. + +**Acceptance:** +- New tests exercise `N` consecutive `Deserialize(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression). +- Existing `DeserializeFromPipeReaderAsync` unit tests continue to pass (single-message path unchanged). +- Wire format unchanged (this is consumer-side reader plumbing, not a wire-level change). +- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup** (ArrayPool rent reused across calls, no MRES per call, no `Task.Run` per call). The deserialized object graph allocations stay (those are user-visible).