From 6dbeae988415e9fedf4e47eb954d596998d0f6f1 Mon Sep 17 00:00:00 2001 From: Loretta Date: Fri, 1 May 2026 06:37:08 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Centralize pipe chunk size and fix buffer reset race Centralized pipe chunk size config for all AcBinary pipe benchmarks, ensuring app-level and kernel buffer sizes stay in sync. Updated AsyncPipeReaderInput.MessageDone to atomically reset both _readPos and _writePos, preventing stale buffer reads. Improved comments and applied AggressiveOptimization to key methods. Adjusted AcquireChunk to ensure wire chunk fits exactly, avoiding kernel fragmentation. Updated related tests and documentation. --- AyCode.Core.Serializers.Console/Program.cs | 62 ++++++++++++------- .../AcBinarySerializerPipeParallelTests.cs | 24 ++++--- .../Binaries/AsyncPipeReaderInput.cs | 33 ++++++++-- .../Binaries/AsyncPipeWriterOutput.cs | 15 ++++- 4 files changed, 95 insertions(+), 39 deletions(-) diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs index 8000830..1f6ef6b 100644 --- a/AyCode.Core.Serializers.Console/Program.cs +++ b/AyCode.Core.Serializers.Console/Program.cs @@ -354,6 +354,21 @@ public static class Program var binaryFastModeNoSgenOption = AcBinarySerializerOptions.FastMode; binaryFastModeNoSgenOption.UseGeneratedCode = false; + // Pipe-aligned max chunk size for the IBufferWriter / NamedPipe variants — matches + // AsyncPipeWriterOutput.MaxChunkSize (UINT16 max = 65535), the largest payload that fits in one + // [201][UINT16][data] wire frame. The same value also drives the kernel pipe buffer in the + // NamedPipeServerStream ctor (inBufferSize/outBufferSize) so the app-level chunk and the + // kernel-level transfer unit stay in sync — one WriteFile(chunkSize) syscall fits blocking-free in + // one kernel pipe-buffer slot, eliminating the page-segmentation in-syscall stall that plagued + // the previous 4 KB profile (where a 65 KB user-space chunk would still get sliced 16× inside + // the kernel because the default kernel pipe buffer is page-sized). + // Centralised here so ALL pipe-style benchmarks (BufWr new, NamedPipe) share a single source of + // truth — change ONLY THIS line when tuning the pipe chunk size, never inside individual benchmark + // ctors. Earlier 4 KB-baseline measurements remain comparable via the archived .LLM logs in + // Test_Benchmark_Results/Benchmark/. + var binaryFastModePipeChunk = AcBinarySerializerOptions.FastMode; + //AsyncPipeWriterOutput.MaxChunkSize; + return new List { // ============================================================ @@ -373,18 +388,17 @@ public static class Program new AcBinaryBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"), // AcBinary via IBufferWriter (FRESH ArrayBufferWriter per call — one-shot scenario). - // BufferWriterChunkSize=4096 → AcBinary advances every 4 KB (smaller internal buffer = sooner Advance/GetMemory cycle, - // matches Kestrel slab + TCP MTU). Despite the property name "ChunkSize", in the IBufferWriter path this is just the - // internal buffer size; wire-format "chunks" only exist in AsyncPipeWriterOutput's chunked-framing mode. - new AcBinaryFreshBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"), + // PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method). + new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"), // AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration. - // Per-iter cost = Byte[] serialize + 4-byte length-prefix framing + pipe write/read syscall + Byte[] deserialize. - // SignalR-style approximation: persistent connection + per-message round-trip + 4 KB initial buffer - // (Kestrel slab + TCP MTU aligned). Single-process loopback, so the number is a lower bound (real - // cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown in - // Ser ms, Des ms = N/A (IsRoundTripOnly). - new AcBinaryNamedPipeBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"), + // PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method) — + // same value drives BOTH the app-level wire chunk AND the kernel pipe buffer (inBufferSize/outBufferSize + // in the NamedPipeServerStream ctor). Persistent connection + multi-message wire framing + max-size + // chunks aligned with the kernel transfer unit. Single-process loopback, so the number is a lower bound + // (real cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown + // in RT ms, Ser/Des = N/A (IsRoundTripOnly). + new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"), // ============================================================ // MemoryPack — three I/O modes for apples-to-apples comparison @@ -774,12 +788,10 @@ public static class Program public AcBinaryFreshBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) { _order = order; + // BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers + // — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk + // size in CreateSerializers only. _options = options; - // Override: 4 KB internal buffer instead of 65535 default — controls how often AcBinary advances - // (Advance + GetMemory) on the underlying IBufferWriter. Smaller buffer = sooner advance = matches - // Kestrel slab + TCP MTU for streaming. NOT a wire-format chunk size (that exists only in - // AsyncPipeWriterOutput's chunked-framing mode); on ArrayBufferWriter this is purely the grow step. - _options.BufferWriterChunkSize = 4096; OptionsPreset = optionsPreset; _serialized = AcBinarySerializer.Serialize(order, _options); } @@ -874,18 +886,26 @@ public static class Program public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset) { _order = order; + // BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers + // — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk + // size in CreateSerializers only. _options = options; - // EXPERIMENTAL: 64 KB chunk size (UINT16 max) — minimises per-chunk FlushAsync syscalls on NamedPipe. - // Diagnostic comparison vs the 4 KB SignalR-realistic profile to see how much the chunk-flush count - // dominates the NamedPipe overhead. Restore to 4096 (Kestrel/TCP-MTU aligned) once we have the data. - _options.BufferWriterChunkSize = AsyncPipeWriterOutput.MaxChunkSize; OptionsPreset = optionsPreset; _serialized = AcBinarySerializer.Serialize(order, _options); - // 1× pipe setup + // 1× pipe setup. Kernel-side pipe buffer (inBufferSize / outBufferSize on the server ctor — the + // client inherits the server-defined buffer size at connect time) matches BufferWriterChunkSize + // exactly: AsyncPipeWriterOutput now treats chunkSize as the chunk-on-wire total size (header + + // data), so one WriteFile(chunkSize) syscall lands in exactly one kernel-page slot — page-aligned, + // no fragmentation, no IRP reordering. _options.BufferWriterChunkSize is the single tunable source. var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}"; - _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); + + _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, + System.IO.Pipes.PipeOptions.Asynchronous, + inBufferSize: _options.BufferWriterChunkSize, + outBufferSize: _options.BufferWriterChunkSize); + _pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); var serverWait = _pipeServer.WaitForConnectionAsync(); diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs index ef60d10..a031c4a 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs @@ -262,15 +262,18 @@ public class AcBinarySerializerPipeParallelTests [TestMethod] public void Feed_ChunkEndMarker_AutoResetsForNextMessage() { - // [202] CHUNK_END is end-of-MESSAGE, NOT end-of-session. The input auto-resets so the same - // long-lived instance can deserialize the next message on the same stream — see - // BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix. Session end is signalled separately by - // an external Complete() call (or stream-EOF on the underlying transport). + // [202] CHUNK_END is end-of-MESSAGE on the WIRE — NOT end-of-session and NOT, by itself, + // a buffer-cursor recycle. On [202], the framing-state machine resets to AwaitingHeader so + // the next [201] header is parsed correctly; buffer-cursor recycling is armed separately by + // the consumer via MessageDone() (typically from the AcBinaryDeserializer.Deserialize( + // AsyncPipeReaderInput, opts) finally block, AFTER the deserialiser has finished reading + // the structurally-complete graph). See BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix. + // Session end is signalled separately by an external Complete() / stream-EOF. using var input = new AsyncPipeReaderInput(64); // Message 1 input.Feed(WrapInChunkFrame([1, 2, 3])); - input.Feed([202]); // CHUNK_END marker — auto-reset, NOT completion + input.Feed([202]); // CHUNK_END — framing reset only (no buffer-cursor recycle, no completion) // First message is consumable input.Initialize(out var buf1, out var pos1, out var bufLen1); @@ -279,16 +282,19 @@ public class AcBinarySerializerPipeParallelTests Assert.AreEqual(2, buf1[1]); Assert.AreEqual(3, buf1[2]); - // Consume the bytes (simulate deserializer): reports position = 3 to producer via TryAdvanceSegment. - // Consumer NOT yet at end-of-session, so this should NOT immediately return false — but since the - // [202] reset _readPos to _writePos (= 3), the next AppendToBuffer for message 2 will recycle to 0. + // Simulate the AcBinaryDeserializer.Deserialize(input, opts) finally block: the consumer + // calls MessageDone() AFTER it has finished reading the graph. This arms the + // _readPos = -1 sentinel; the next AppendToBuffer for message 2 sees rp < 0 and recycles + // the buffer to 0 (sliding-window cycling). + input.MessageDone(); // Message 2 — same long-lived input, just keeps going input.Feed(WrapInChunkFrame([10, 20, 30, 40])); input.Feed([202]); // Re-initialize for the next deserializer call — the buffer was recycled to 0 by the - // sliding-window cycling triggered when AppendToBuffer saw _readPos == _writePos > 0. + // sliding-window cycling triggered when AppendToBuffer saw _readPos == -1 (sentinel + // armed by the MessageDone() call above). input.Initialize(out var buf2, out var pos2, out var bufLen2); Assert.AreEqual(4, bufLen2); Assert.AreEqual(10, buf2[0]); diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index 397f543..53f9b22 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -170,6 +170,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// is the same as end-of-stream, signalled by external call. /// /// + [MethodImpl(MethodImplOptions.AggressiveOptimization)] public void Feed(ReadOnlySpan data) { if (data.IsEmpty) return; @@ -265,6 +266,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// (reset to 0 when consumer has caught up OR a [202] message-end sentinel was raised) and /// grow-as-last-resort. Signals the consumer. /// + [MethodImpl(MethodImplOptions.AggressiveOptimization)] private void AppendToBuffer(ReadOnlySpan data) { // Cycle the buffer to 0 if either: @@ -321,9 +323,19 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// Called by the consumer to signal "I have finished reading the current message" — typically /// from the /// finally block, AFTER the deserialiser has finished reading and the structurally-complete graph - /// has been returned. Arms a _readPos = -1 sentinel that the next - /// picks up for sliding-window cycling — recycles the buffer to 0 - /// for the next message on the long-lived input. + /// has been returned. Resets BOTH _readPos AND _writePos to 0 atomically so the next + /// sees a fresh empty buffer (bufferLength = 0) and the consumer + /// blocks in until the producer's next message arrives — the + /// drain task's first for the next message writes from offset 0 + /// (no cycling needed; positions are already 0). + /// + /// Why reset both positions, not just _readPos-sentinel: a _readPos = -1 + /// sentinel alone leaves _writePos at the previous message's end. If + /// runs BEFORE the drain task's next (a real race when single-chunk + /// messages fit in one transport pass), the consumer reads bufferLength = _writePos = stale + /// value, and starts deserialising the previous message's bytes from offset 0 — corruption. Resetting + /// both atomically here closes the race: always sees bufferLength = 0, + /// and the next writes at _writePos = 0 (no cycle needed). /// /// Why the consumer signals (not the producer): the producer parses [202] /// strictly on the wire — at the moment [202] arrives, the consumer-thread may still be @@ -334,14 +346,23 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// consumer has finished reading, AND the wire-side [202] has long since been parsed /// (since the consumer reads only what the producer wrote). /// + /// Thread-safety: safe because the producer (drain task / ) cannot + /// have an in flight at this moment — the consumer's + /// just returned (graph complete = all bytes already appended), and + /// the producer-side for the NEXT message has not yet been issued by + /// the calling thread (strictly sequential per-thread Serialize → Deserialize loop). Any + /// pending [202] still being parsed by the drain task only mutates framing state, never + /// invokes . + /// /// Idempotent: safe to call multiple times. No-op if the session has already /// completed ( is true) — there are no further messages. /// public void MessageDone() { if (Volatile.Read(ref _completed)) return; // session already over - Volatile.Write(ref _readPos, -1); - EmitDiagnostic("MessageDone: _readPos sentinel armed for next AppendToBuffer"); + Volatile.Write(ref _writePos, 0); + Volatile.Write(ref _readPos, 0); + EmitDiagnostic("MessageDone: positions reset (writePos=0, readPos=0) for next message"); } // --- IBinaryInputBase (consumer thread) --- @@ -371,7 +392,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// After grow, re-reads _buffer to get the new (larger) array. After position reset /// (readPos/writePos set to 0 by producer), re-reads adjusted positions. /// - [MethodImpl(MethodImplOptions.NoInlining)] + [MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)] public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) { EmitDiagnostic($"TryAdvanceSegment enter position={position} bufferLength={bufferLength} needed={needed}"); diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index f48f078..52d1fd7 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -193,7 +193,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// mode, patches the [201][UINT16 size] header before Advance; in raw mode, simply /// Advances the data bytes), then fires a background flush and acquires the next chunk. /// - [MethodImpl(MethodImplOptions.NoInlining)] + [MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) { if (_serializeFlushAndAcquire) @@ -257,6 +257,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// Zero-copy: no data copying in either mode. The pre-flush wait covers any in-flight /// fire-and-forget flush from on the Pipe-based parallel path. /// + [MethodImpl(MethodImplOptions.AggressiveOptimization)] public void Flush(byte[] buffer, int position) { // Wait for any in-flight flush from previous Grow @@ -360,13 +361,19 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _pipeWriter.Advance(length); } + [MethodImpl(MethodImplOptions.AggressiveOptimization)] private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) { - var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize); - // Header reservation only in framed mode — raw mode skips it for byte-compat with the // single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers). var headerOffset = _multiMessage ? HeaderSize : 0; + + // chunkSize is the chunk-on-wire total size (header + data) — subtract the header overhead + // here so the actual data payload + header fits exactly in chunkSize bytes on the wire. + // Without this, every WriteFile would emit chunkSize+3 bytes, which spans 2 kernel-page slots + // for any page-aligned chunkSize (Kestrel slab 4 KB, max 64 KB) → fragmented kernel transfers + // + the consumer's framing-state-machine sees chunk headers split across Feed boundaries. + var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize) - headerOffset; var totalRequest = dataSize + headerOffset; var memory = _pipeWriter.GetMemory(totalRequest); @@ -377,6 +384,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase buffer = segment.Array; position = segment.Offset + headerOffset; bufferEnd = segment.Offset + headerOffset + dataSize; + _hasOwnedBuffer = false; EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); @@ -402,6 +410,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase buffer = _ownedBuffer; position = headerOffset; bufferEnd = headerOffset + dataSize; + _hasOwnedBuffer = true; } }