[LOADED_DOCS: 3 files, no new loads]

Centralize pipe chunk size and fix buffer reset race

Centralized pipe chunk size config for all AcBinary pipe benchmarks, ensuring app-level and kernel buffer sizes stay in sync. Updated AsyncPipeReaderInput.MessageDone to atomically reset both _readPos and _writePos, preventing stale buffer reads. Improved comments and applied AggressiveOptimization to key methods. Adjusted AcquireChunk to ensure wire chunk fits exactly, avoiding kernel fragmentation. Updated related tests and documentation.
This commit is contained in:
Loretta 2026-05-01 06:37:08 +02:00
parent 5561246e8c
commit 6dbeae9884
4 changed files with 95 additions and 39 deletions

View File

@ -354,6 +354,21 @@ public static class Program
var binaryFastModeNoSgenOption = AcBinarySerializerOptions.FastMode;
binaryFastModeNoSgenOption.UseGeneratedCode = false;
// Pipe-aligned max chunk size for the IBufferWriter / NamedPipe variants — matches
// AsyncPipeWriterOutput.MaxChunkSize (UINT16 max = 65535), the largest payload that fits in one
// [201][UINT16][data] wire frame. The same value also drives the kernel pipe buffer in the
// NamedPipeServerStream ctor (inBufferSize/outBufferSize) so the app-level chunk and the
// kernel-level transfer unit stay in sync — one WriteFile(chunkSize) syscall fits blocking-free in
// one kernel pipe-buffer slot, eliminating the page-segmentation in-syscall stall that plagued
// the previous 4 KB profile (where a 65 KB user-space chunk would still get sliced 16× inside
// the kernel because the default kernel pipe buffer is page-sized).
// Centralised here so ALL pipe-style benchmarks (BufWr new, NamedPipe) share a single source of
// truth — change ONLY THIS line when tuning the pipe chunk size, never inside individual benchmark
// ctors. Earlier 4 KB-baseline measurements remain comparable via the archived .LLM logs in
// Test_Benchmark_Results/Benchmark/.
var binaryFastModePipeChunk = AcBinarySerializerOptions.FastMode;
//AsyncPipeWriterOutput.MaxChunkSize;
return new List<ISerializerBenchmark>
{
// ============================================================
@ -373,18 +388,17 @@ public static class Program
new AcBinaryBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"),
// AcBinary via IBufferWriter (FRESH ArrayBufferWriter per call — one-shot scenario).
// BufferWriterChunkSize=4096 → AcBinary advances every 4 KB (smaller internal buffer = sooner Advance/GetMemory cycle,
// matches Kestrel slab + TCP MTU). Despite the property name "ChunkSize", in the IBufferWriter path this is just the
// internal buffer size; wire-format "chunks" only exist in AsyncPipeWriterOutput's chunked-framing mode.
new AcBinaryFreshBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"),
// PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method).
new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
// AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration.
// Per-iter cost = Byte[] serialize + 4-byte length-prefix framing + pipe write/read syscall + Byte[] deserialize.
// SignalR-style approximation: persistent connection + per-message round-trip + 4 KB initial buffer
// (Kestrel slab + TCP MTU aligned). Single-process loopback, so the number is a lower bound (real
// cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown in
// Ser ms, Des ms = N/A (IsRoundTripOnly).
new AcBinaryNamedPipeBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode (4KB buffer)"),
// PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method) —
// same value drives BOTH the app-level wire chunk AND the kernel pipe buffer (inBufferSize/outBufferSize
// in the NamedPipeServerStream ctor). Persistent connection + multi-message wire framing + max-size
// chunks aligned with the kernel transfer unit. Single-process loopback, so the number is a lower bound
// (real cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown
// in RT ms, Ser/Des = N/A (IsRoundTripOnly).
new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
// ============================================================
// MemoryPack — three I/O modes for apples-to-apples comparison
@ -774,12 +788,10 @@ public static class Program
public AcBinaryFreshBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
// BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers
// — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk
// size in CreateSerializers only.
_options = options;
// Override: 4 KB internal buffer instead of 65535 default — controls how often AcBinary advances
// (Advance + GetMemory) on the underlying IBufferWriter. Smaller buffer = sooner advance = matches
// Kestrel slab + TCP MTU for streaming. NOT a wire-format chunk size (that exists only in
// AsyncPipeWriterOutput's chunked-framing mode); on ArrayBufferWriter this is purely the grow step.
_options.BufferWriterChunkSize = 4096;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
}
@ -874,18 +886,26 @@ public static class Program
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
// BufferWriterChunkSize comes from the caller (central source of truth in CreateSerializers
// — the binaryFastMode4KbChunk options instance). Do NOT mutate _options here; tune the chunk
// size in CreateSerializers only.
_options = options;
// EXPERIMENTAL: 64 KB chunk size (UINT16 max) — minimises per-chunk FlushAsync syscalls on NamedPipe.
// Diagnostic comparison vs the 4 KB SignalR-realistic profile to see how much the chunk-flush count
// dominates the NamedPipe overhead. Restore to 4096 (Kestrel/TCP-MTU aligned) once we have the data.
_options.BufferWriterChunkSize = AsyncPipeWriterOutput.MaxChunkSize;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
// 1× pipe setup
// 1× pipe setup. Kernel-side pipe buffer (inBufferSize / outBufferSize on the server ctor — the
// client inherits the server-defined buffer size at connect time) matches BufferWriterChunkSize
// exactly: AsyncPipeWriterOutput now treats chunkSize as the chunk-on-wire total size (header +
// data), so one WriteFile(chunkSize) syscall lands in exactly one kernel-page slot — page-aligned,
// no fragmentation, no IRP reordering. _options.BufferWriterChunkSize is the single tunable source.
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous,
inBufferSize: _options.BufferWriterChunkSize,
outBufferSize: _options.BufferWriterChunkSize);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync();

View File

@ -262,15 +262,18 @@ public class AcBinarySerializerPipeParallelTests
[TestMethod]
public void Feed_ChunkEndMarker_AutoResetsForNextMessage()
{
// [202] CHUNK_END is end-of-MESSAGE, NOT end-of-session. The input auto-resets so the same
// long-lived instance can deserialize the next message on the same stream — see
// BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix. Session end is signalled separately by
// an external Complete() call (or stream-EOF on the underlying transport).
// [202] CHUNK_END is end-of-MESSAGE on the WIRE — NOT end-of-session and NOT, by itself,
// a buffer-cursor recycle. On [202], the framing-state machine resets to AwaitingHeader so
// the next [201] header is parsed correctly; buffer-cursor recycling is armed separately by
// the consumer via MessageDone() (typically from the AcBinaryDeserializer.Deserialize<T>(
// AsyncPipeReaderInput, opts) finally block, AFTER the deserialiser has finished reading
// the structurally-complete graph). See BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix.
// Session end is signalled separately by an external Complete() / stream-EOF.
using var input = new AsyncPipeReaderInput(64);
// Message 1
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed([202]); // CHUNK_END marker — auto-reset, NOT completion
input.Feed([202]); // CHUNK_END — framing reset only (no buffer-cursor recycle, no completion)
// First message is consumable
input.Initialize(out var buf1, out var pos1, out var bufLen1);
@ -279,16 +282,19 @@ public class AcBinarySerializerPipeParallelTests
Assert.AreEqual(2, buf1[1]);
Assert.AreEqual(3, buf1[2]);
// Consume the bytes (simulate deserializer): reports position = 3 to producer via TryAdvanceSegment.
// Consumer NOT yet at end-of-session, so this should NOT immediately return false — but since the
// [202] reset _readPos to _writePos (= 3), the next AppendToBuffer for message 2 will recycle to 0.
// Simulate the AcBinaryDeserializer.Deserialize<T>(input, opts) finally block: the consumer
// calls MessageDone() AFTER it has finished reading the graph. This arms the
// _readPos = -1 sentinel; the next AppendToBuffer for message 2 sees rp < 0 and recycles
// the buffer to 0 (sliding-window cycling).
input.MessageDone();
// Message 2 — same long-lived input, just keeps going
input.Feed(WrapInChunkFrame([10, 20, 30, 40]));
input.Feed([202]);
// Re-initialize for the next deserializer call — the buffer was recycled to 0 by the
// sliding-window cycling triggered when AppendToBuffer saw _readPos == _writePos > 0.
// sliding-window cycling triggered when AppendToBuffer saw _readPos == -1 (sentinel
// armed by the MessageDone() call above).
input.Initialize(out var buf2, out var pos2, out var bufLen2);
Assert.AreEqual(4, bufLen2);
Assert.AreEqual(10, buf2[0]);

View File

@ -170,6 +170,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// is the same as end-of-stream, signalled by external <see cref="Complete"/> call.</item>
/// </list>
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
public void Feed(ReadOnlySpan<byte> data)
{
if (data.IsEmpty) return;
@ -265,6 +266,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// (reset to 0 when consumer has caught up OR a [202] message-end sentinel was raised) and
/// grow-as-last-resort. Signals the consumer.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
private void AppendToBuffer(ReadOnlySpan<byte> data)
{
// Cycle the buffer to 0 if either:
@ -321,9 +323,19 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// Called by the consumer to signal "I have finished reading the current message" — typically
/// from the <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// finally block, AFTER the deserialiser has finished reading and the structurally-complete graph
/// has been returned. Arms a <c>_readPos = -1</c> sentinel that the next
/// <see cref="AppendToBuffer"/> picks up for sliding-window cycling — recycles the buffer to 0
/// for the next message on the long-lived input.
/// has been returned. Resets BOTH <c>_readPos</c> AND <c>_writePos</c> to 0 atomically so the next
/// <see cref="Initialize"/> sees a fresh empty buffer (<c>bufferLength = 0</c>) and the consumer
/// blocks in <see cref="TryAdvanceSegment"/> until the producer's next message arrives — the
/// drain task's first <see cref="AppendToBuffer"/> for the next message writes from offset 0
/// (no cycling needed; positions are already 0).
///
/// <para><b>Why reset both positions, not just <c>_readPos</c>-sentinel</b>: a <c>_readPos = -1</c>
/// sentinel alone leaves <c>_writePos</c> at the previous message's end. If <see cref="Initialize"/>
/// runs BEFORE the drain task's next <see cref="AppendToBuffer"/> (a real race when single-chunk
/// messages fit in one transport pass), the consumer reads <c>bufferLength = _writePos</c> = stale
/// value, and starts deserialising the previous message's bytes from offset 0 — corruption. Resetting
/// both atomically here closes the race: <see cref="Initialize"/> always sees <c>bufferLength = 0</c>,
/// and the next <see cref="AppendToBuffer"/> writes at <c>_writePos = 0</c> (no cycle needed).</para>
///
/// <para><b>Why the consumer signals (not the producer)</b>: the producer parses <c>[202]</c>
/// strictly on the wire — at the moment <c>[202]</c> arrives, the consumer-thread may still be
@ -334,14 +346,23 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// consumer has finished reading, AND the wire-side <c>[202]</c> has long since been parsed
/// (since the consumer reads only what the producer wrote).</para>
///
/// <para><b>Thread-safety</b>: safe because the producer (drain task / <see cref="Feed"/>) cannot
/// have an <see cref="AppendToBuffer"/> in flight at this moment — the consumer's
/// <see cref="Deserialize{T}"/> just returned (graph complete = all bytes already appended), and
/// the producer-side <see cref="Serialize{T}"/> for the NEXT message has not yet been issued by
/// the calling thread (strictly sequential per-thread <c>Serialize → Deserialize</c> loop). Any
/// pending <c>[202]</c> still being parsed by the drain task only mutates framing state, never
/// invokes <see cref="AppendToBuffer"/>.</para>
///
/// <para><b>Idempotent</b>: safe to call multiple times. No-op if the session has already
/// completed (<see cref="IsCompleted"/> is <c>true</c>) — there are no further messages.</para>
/// </summary>
public void MessageDone()
{
if (Volatile.Read(ref _completed)) return; // session already over
Volatile.Write(ref _readPos, -1);
EmitDiagnostic("MessageDone: _readPos sentinel armed for next AppendToBuffer");
Volatile.Write(ref _writePos, 0);
Volatile.Write(ref _readPos, 0);
EmitDiagnostic("MessageDone: positions reset (writePos=0, readPos=0) for next message");
}
// --- IBinaryInputBase (consumer thread) ---
@ -371,7 +392,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// After grow, re-reads <c>_buffer</c> to get the new (larger) array. After position reset
/// (readPos/writePos set to 0 by producer), re-reads adjusted positions.</para>
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
[MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
EmitDiagnostic($"TryAdvanceSegment enter position={position} bufferLength={bufferLength} needed={needed}");

View File

@ -193,7 +193,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// mode, patches the <c>[201][UINT16 size]</c> header before Advance; in raw mode, simply
/// Advances the data bytes), then fires a background flush and acquires the next chunk.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
[MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{
if (_serializeFlushAndAcquire)
@ -257,6 +257,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// <para>Zero-copy: no data copying in either mode. The pre-flush wait covers any in-flight
/// fire-and-forget flush from <see cref="Grow"/> on the Pipe-based parallel path.</para>
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
public void Flush(byte[] buffer, int position)
{
// Wait for any in-flight flush from previous Grow
@ -360,13 +361,19 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
_pipeWriter.Advance(length);
}
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
{
var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize);
// Header reservation only in framed mode — raw mode skips it for byte-compat with the
// single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers).
var headerOffset = _multiMessage ? HeaderSize : 0;
// chunkSize is the chunk-on-wire total size (header + data) — subtract the header overhead
// here so the actual data payload + header fits exactly in chunkSize bytes on the wire.
// Without this, every WriteFile would emit chunkSize+3 bytes, which spans 2 kernel-page slots
// for any page-aligned chunkSize (Kestrel slab 4 KB, max 64 KB) → fragmented kernel transfers
// + the consumer's framing-state-machine sees chunk headers split across Feed boundaries.
var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize) - headerOffset;
var totalRequest = dataSize + headerOffset;
var memory = _pipeWriter.GetMemory(totalRequest);
@ -377,6 +384,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
buffer = segment.Array;
position = segment.Offset + headerOffset;
bufferEnd = segment.Offset + headerOffset + dataSize;
_hasOwnedBuffer = false;
EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]");
@ -402,6 +410,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
buffer = _ownedBuffer;
position = headerOffset;
bufferEnd = headerOffset + dataSize;
_hasOwnedBuffer = true;
}
}