diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs
index e5548f5..8000830 100644
--- a/AyCode.Core.Serializers.Console/Program.cs
+++ b/AyCode.Core.Serializers.Console/Program.cs
@@ -69,9 +69,9 @@ public static class Program
private const int TestIterations = 1;
private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
- private static int WarmupIterations = 5;
- private static int TestIterations = 1;
- private static int BenchmarkSamples = 1;
+ private static int WarmupIterations = 1000; //5000
+ private static int TestIterations = 1000; //1000
+ private static int BenchmarkSamples = 3;
#endif
public static void Main(string[] args)
@@ -817,27 +817,29 @@ public static class Program
/// (
/// + + ).
/// Mirrors what a real consumer (e.g. DeserializeFromPipeReaderAsync) does per message:
- /// fresh input + 2 background tasks (drain + deserialize) per iteration, on top of a long-lived NamedPipe.
+ /// long-lived with multi-message wire framing on top of a long-lived NamedPipe.
///
/// Architecture:
///
/// - Constructor (NOT timed): sets up + ,
/// waits for connection, creates one long-lived /
- /// pair on top of the streams.
- /// - Per-iteration (timed): sender writes one message via
- ///
- /// + FlushAsync on the long-lived pipeWriter; receiver creates a per-message
- /// , spawns a drain Task and a deserialize Task, awaits the deserialize result,
- /// then cancels the drain (which calls input.Complete() in its finally) and disposes the input.
+ /// pair, ONE long-lived
+ /// in multiMessage = true mode, ONE drain Task that pumps
+ /// forever, and ONE deserialize Task that loops AcBinaryDeserializer.Deserialize<T>(input, opts)
+ /// producing into a .
+ /// - Per-iteration (timed): sender writes via
+ ///
+ /// — multi-message wire ([201][UINT16][data]...[202]); the [202] end marker arms the input's
+ /// _readPos = -1 sentinel, so the next message's first AppendToBuffer recycles the buffer to 0.
+ /// Then receiver awaits the channel for the deserialized result.
/// - is a no-op (full round-trip captured in );
/// =true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip.
///
///
- /// Why per-message tasks: the current AcBinary streaming API does not allow long-lived
- /// reuse across multiple messages on a raw transport — see
- /// BINARY_ISSUES.md#accore-bin-i-q4t8. This is therefore the canonical pattern, mirrored after
- /// AcBinaryDeserializer.DeserializeFromPipeReaderAsync's internals. The Task.Run pair + per-iter
- /// AsyncPipeReaderInput allocation are an intrinsic cost of the API today, NOT a benchmark artifact.
+ /// Per-iter overhead: 0 new Task.Run, 0 new AsyncPipeReaderInput, 0 new CancellationTokenSource.
+ /// Pure cost = SerializeChunkedFramed (CPU + chunk-onkénti flush) + kernel write/read syscalls + 1 sync barrier
+ /// (channel) + deserialized graph alloc. The "multi-message reuse" pattern enabled by Q4T8 fix (R5K2 minimum: _readPos = -1
+ /// sentinel + AppendToBuffer sliding-window cycling).
///
/// Approximation note: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR
/// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound.
@@ -853,6 +855,11 @@ public static class Program
private readonly NamedPipeClientStream _pipeClient;
private readonly PipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
+
+ // Long-lived multi-message receive infrastructure (set up once in ctor).
+ private readonly AsyncPipeReaderInput _input;
+ private readonly CancellationTokenSource _cts;
+ private readonly Task _drainTask;
private bool _disposed;
public string Engine => EngineAcBinary;
@@ -861,35 +868,43 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
- public bool IsRoundTripOnly => true; // Serialize() does the full per-message round-trip; Deserialize() is a no-op
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,SerializeChunked+AsyncPipeReaderInput)";
+ public bool IsRoundTripOnly => true;
+ public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)";
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
_options = options;
- // 4 KB chunk size for the AsyncPipeWriterOutput (raw mode — no [201][UINT16][data] framing).
- // Aligns with Kestrel slab + TCP MTU, the realistic SignalR-style profile.
- _options.BufferWriterChunkSize = 4096;
+ // EXPERIMENTAL: 64 KB chunk size (UINT16 max) — minimises per-chunk FlushAsync syscalls on NamedPipe.
+ // Diagnostic comparison vs the 4 KB SignalR-realistic profile to see how much the chunk-flush count
+ // dominates the NamedPipe overhead. Restore to 4096 (Kestrel/TCP-MTU aligned) once we have the data.
+ _options.BufferWriterChunkSize = AsyncPipeWriterOutput.MaxChunkSize;
OptionsPreset = optionsPreset;
_serialized = AcBinarySerializer.Serialize(order, _options);
- // 1× setup — long-lived NamedPipe + PipeWriter/PipeReader on top of the streams.
- // Byte mode (not Message mode) — AcBinary's chunked-stream wire format manages its own boundaries
- // via the deserializer's structural knowledge of when an object graph ends.
+ // 1× pipe setup
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
-
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
var serverWait = _pipeServer.WaitForConnectionAsync();
-
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
_pipeWriter = PipeWriter.Create(_pipeClient);
_pipeReader = PipeReader.Create(_pipeServer);
+
+ // 1× multi-message receive infrastructure: long-lived input + 1 background drain task.
+ // Per-iter Serialize() does its own Deserialize(input, opts) call on the calling thread —
+ // strictly sequential per the calling thread's loop, so the producer (drain) and consumer
+ // (deserialiser, on the calling thread) cannot race on the buffer.
+ _input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: true);
+ _cts = new CancellationTokenSource();
+
+ // Drain task: pumps PipeReader → input.Feed forever (or until cancel). Single Task.Run for
+ // the full benchmark lifetime (NOT per iteration) — its overhead is amortised across all messages.
+ _drainTask = Task.Run(() => _input.DrainFromAsync(_pipeReader, _cts.Token));
}
public void Warmup(int iterations)
@@ -903,23 +918,16 @@ public static class Program
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
- using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false);
- using var cts = new CancellationTokenSource();
+ // Sender: multi-message wire framing — [201][UINT16][data]...[202]. The Flush() inside
+ // SerializeChunkedFramed writes the [202] CHUNK_END marker and flushes the kernel buffer.
+ AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
- // Receiver tasks must be ready BEFORE the sender flushes — otherwise the FlushAsync deadlocks
- // waiting for someone to drain the kernel pipe buffer (NamedPipe loopback flow control).
- var drainTask = input.DrainFromAsync(_pipeReader, cts.Token);
- var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, _options), cts.Token);
-
- AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options);
- _pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
-
- _ = deserTask.GetAwaiter().GetResult();
-
- cts.Cancel();
- try { drainTask.GetAwaiter().GetResult(); }
- catch (OperationCanceledException) { }
- catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
+ // Receiver: synchronous Deserialize on the calling thread. Blocks (via TryAdvanceSegment's
+ // MRES.Wait) until the drain task has fed enough bytes for the structurally-complete graph.
+ // Returns when the graph is complete; finally block calls input.MessageDone() which arms
+ // _readPos = -1 sentinel for the next Append-cycle. Strictly sequential on the calling thread:
+ // the next Serialize() call's SerializeChunkedFramed only runs after this Deserialize returns.
+ _ = AcBinaryDeserializer.Deserialize(_input, _options);
}
[MethodImpl(MethodImplOptions.NoInlining)]
@@ -930,23 +938,9 @@ public static class Program
public bool VerifyRoundTrip()
{
- // Single round-trip via the same path Serialize() uses, with the deserialized graph compared.
- using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false);
- using var cts = new CancellationTokenSource();
-
- var drainTask = input.DrainFromAsync(_pipeReader, cts.Token);
- var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, _options), cts.Token);
-
- AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options);
- _pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
-
- var result = deserTask.GetAwaiter().GetResult();
- cts.Cancel();
-
- try { drainTask.GetAwaiter().GetResult(); }
- catch (OperationCanceledException) { }
- catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
-
+ // Round-trip one message synchronously on the calling thread.
+ AcBinarySerializer.SerializeChunkedFramed(_order, _pipeWriter, _options);
+ var result = AcBinaryDeserializer.Deserialize(_input, _options);
return result != null && DeepEqualsViaJson(_order, result);
}
@@ -954,12 +948,18 @@ public static class Program
{
if (_disposed) return;
_disposed = true;
- // Complete the writer side first → the underlying pipe stream signals EOF, the reader sees it,
- // any in-flight DrainFromAsync exits cleanly. Then dispose the streams.
+
+ // Cancel drain task → DrainFromAsync exits → input.Complete() in its finally.
+ try { _cts.Cancel(); } catch { /* swallow on teardown */ }
+ try { _drainTask.Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
+
+ // Complete writer + dispose pipe lifecycle.
try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
try { _pipeReader.Complete(); } catch { /* swallow on teardown */ }
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
+ try { _input.Dispose(); } catch { /* swallow on teardown */ }
+ try { _cts.Dispose(); } catch { /* swallow on teardown */ }
}
}
diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
index 67eb220..f98936a 100644
--- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
+++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
@@ -309,11 +309,33 @@ public static partial class AcBinaryDeserializer
///
public static T? Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options)
- => DeserializeSequence(new AsyncPipeReaderInputAdapter(input), typeof(T), options);
+ {
+ try
+ {
+ return DeserializeSequence(new AsyncPipeReaderInputAdapter(input), typeof(T), options);
+ }
+ finally
+ {
+ // Consumer-side "I'm done with this message" signal — arms _readPos = -1 sentinel so the
+ // next AppendToBuffer recycles the buffer to 0 for the next message on the long-lived input.
+ // Race-free: the deserializer has finished reading by this point (try block returned),
+ // so the producer is free to mutate _writePos / _readPos.
+ input.MessageDone();
+ }
+ }
///
public static object? Deserialize(AsyncPipeReaderInput input, Type targetType, AcBinarySerializerOptions options)
- => DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options);
+ {
+ try
+ {
+ return DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options);
+ }
+ finally
+ {
+ input.MessageDone();
+ }
+ }
///
/// Deserialize from a with full streaming pipeline
diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
index 36bcb81..397f543 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
@@ -201,18 +201,18 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
}
else if (marker == ChunkEnd)
{
- // [202] = end of CURRENT message (NOT end of session). Two-step signal:
- // (a) reset framing state machine to AwaitingHeader for the next [201] header,
- // (b) write _readPos = -1 sentinel — picked up by the next AppendToBuffer's
- // sliding-window cycling, which resets the buffer to 0 for the new message.
+ // [202] = end of CURRENT message on the WIRE (NOT end of session). Reset only the
+ // framing state machine to AwaitingHeader for the next [201] header.
+ // Buffer-cursor recycling is NOT triggered here — the producer-thread cannot safely
+ // reset _writePos = 0 while the consumer-thread may still be reading earlier bytes
+ // of the just-finished message (multi-chunk messages: by the time the producer
+ // parses [202], the consumer may have only consumed part of the buffer).
+ // The buffer-cycle signal is emitted by the consumer via MessageDone() — typically
+ // from the AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, opts) finally
+ // block, AFTER the deserialiser has finished reading and returned the graph.
// _completed stays false — only external Complete() / stream-EOF marks session end.
- // The sentinel is wire-format intrinsic: TryAdvanceSegment / Initialize handle
- // _readPos < 0 defensively (treat as "fully consumed"), so the consumer never
- // observes the sentinel directly — by the time the consumer reaches the next
- // Initialize call, AppendToBuffer has already cycled _readPos back to 0.
- EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset, _readPos sentinel armed");
+ EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset; awaiting MessageDone() from consumer for buffer recycle");
_framingState = FramingState.AwaitingHeader;
- Volatile.Write(ref _readPos, -1);
}
else
{
@@ -309,6 +309,41 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
EmitDiagnostic($"Complete writePos={Volatile.Read(ref _writePos)} readPos={Volatile.Read(ref _readPos)}");
}
+ ///
+ /// Whether has been called (or 's underlying
+ /// stream signalled EOF and the finally block closed the input). Once true, the session
+ /// has ended — any pending
+ /// call returns whatever partial buffer is left, and subsequent calls return immediately.
+ ///
+ public bool IsCompleted => Volatile.Read(ref _completed);
+
+ ///
+ /// Called by the consumer to signal "I have finished reading the current message" — typically
+ /// from the
+ /// finally block, AFTER the deserialiser has finished reading and the structurally-complete graph
+ /// has been returned. Arms a _readPos = -1 sentinel that the next
+ /// picks up for sliding-window cycling — recycles the buffer to 0
+ /// for the next message on the long-lived input.
+ ///
+ /// Why the consumer signals (not the producer): the producer parses [202]
+ /// strictly on the wire — at the moment [202] arrives, the consumer-thread may still be
+ /// mid-graph reading earlier bytes of the just-finished message (especially for multi-chunk
+ /// messages, where the producer outpaces the consumer). Recycling the buffer based on
+ /// producer-side [202] alone races with the in-flight consumer read. By emitting the
+ /// signal from the consumer's "I'm done"-moment instead, both orderings are guaranteed: the
+ /// consumer has finished reading, AND the wire-side [202] has long since been parsed
+ /// (since the consumer reads only what the producer wrote).
+ ///
+ /// Idempotent: safe to call multiple times. No-op if the session has already
+ /// completed ( is true) — there are no further messages.
+ ///
+ public void MessageDone()
+ {
+ if (Volatile.Read(ref _completed)) return; // session already over
+ Volatile.Write(ref _readPos, -1);
+ EmitDiagnostic("MessageDone: _readPos sentinel armed for next AppendToBuffer");
+ }
+
// --- IBinaryInputBase (consumer thread) ---
///