diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs
index bc5a25c..e5548f5 100644
--- a/AyCode.Core.Serializers.Console/Program.cs
+++ b/AyCode.Core.Serializers.Console/Program.cs
@@ -57,22 +57,21 @@ public static class Program
private const string ModeRuntime = "Runtime";
private const string ModeHybrid = "Hybrid";
+ private const int JitSleep = 3000;
+
// OptionsPreset values are passed per-instance (constructor argument), not constants —
// each CreateSerializers call line specifies its own preset name (e.g. "FastMode", "NoIntern").
private static readonly UTF8Encoding Utf8NoBom = new(encoderShouldEmitUTF8Identifier: false);
#if DEBUG
- private static int WarmupIterations = 0;
- private static int TestIterations = 1;
- private static int BenchmarkSamples = 1; // Debug: single sample, fast iteration
+ private const int WarmupIterations = 0;
+ private const int TestIterations = 1;
+ private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
- private static int WarmupIterations = 5000;
- private static int TestIterations = 1000;
- private static int BenchmarkSamples = 5; // Release: 5-sample median for stability (~±5% variance vs. ~±15% single-sample)
-
- //private static int WarmupIterations = 5000;
- //private static int TestIterations = 2000;
+ private static int WarmupIterations = 5;
+ private static int TestIterations = 1;
+ private static int BenchmarkSamples = 1;
#endif
public static void Main(string[] args)
@@ -87,7 +86,7 @@ public static class Program
// Determine layer (which test data to run) and opMode (ser/des/all).
// CLI args take precedence; if no args, show interactive menu.
string layer;
- string opMode = "all";
+ var opMode = "all";
if (args.Length == 0)
{
@@ -175,7 +174,7 @@ public static class Program
}
}
// Let background tiered-JIT compilation drain before we begin measuring.
- Thread.Sleep(3000);
+ Thread.Sleep(JitSleep);
System.Console.WriteLine("✓ Global pre-warmup complete.\n");
}
@@ -215,7 +214,7 @@ public static class Program
var options = AcBinarySerializerOptions.WithoutReferenceHandling;
options.UseStringInterning = StringInterningMode.None;
- byte[] bytes = AcBinarySerializer.Serialize(order, options);
+ var bytes = AcBinarySerializer.Serialize(order, options);
// Warmup (fills caches)
System.Console.WriteLine("Warming up (1000 iterations)...");
for (var i = 0; i < 1000; i++)
@@ -261,6 +260,7 @@ public static class Program
// Round-trip correctness check — once per (cell × serializer), BEFORE warmup. Aborts the entire benchmark on failure.
System.Console.WriteLine("Verifying round-trip correctness...");
+
foreach (var serializer in serializers)
{
if (!serializer.VerifyRoundTrip())
@@ -270,6 +270,7 @@ public static class Program
Environment.Exit(1);
}
}
+
System.Console.WriteLine("✓ All serializers passed round-trip verification.");
// Warmup all serializers
@@ -280,7 +281,7 @@ public static class Program
}
// Wait for tiered JIT background compilation to complete
- Thread.Sleep(3000);
+ Thread.Sleep(JitSleep);
// Run benchmarks
System.Console.WriteLine($"Running benchmarks ({TestIterations} iterations × {BenchmarkSamples} samples median)...\n");
@@ -421,7 +422,7 @@ public static class Program
}
var times = new double[samples];
- for (int s = 0; s < samples; s++)
+ for (var s = 0; s < samples; s++)
{
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
@@ -812,41 +813,46 @@ public static class Program
}
///
- /// Benchmarks AcBinary over a long-lived NamedPipe IPC connection — pipe is set up ONCE in the constructor;
- /// each iteration only sends a length-prefixed payload through the existing pipe. Closer to a real SignalR-style
- /// scenario where the connection is established at process start and reused for many messages, rather than the
- /// pathological one-pipe-per-message setup overhead.
+ /// Benchmarks AcBinary over a long-lived NamedPipe IPC connection using the AcBinary native streaming API
+ /// (
+ /// + + ).
+ /// Mirrors what a real consumer (e.g. DeserializeFromPipeReaderAsync) does per message:
+ /// fresh input + 2 background tasks (drain + deserialize) per iteration, on top of a long-lived NamedPipe.
///
/// Architecture:
///
- /// - Constructor: sets up + ,
- /// waits for connection, starts a long-lived background drain task on the server side that reads length-prefixed
- /// messages and pushes deserialized results into a .
- /// - Per-iteration : encodes the payload via the Byte[] API, writes a 4-byte length
- /// prefix + payload bytes to the pipe, then awaits the channel for the server-deserialized result.
- /// - is a no-op (the round-trip happens inside Serialize); same IsRoundTripOnly contract
- /// as the previous one-shot variant.
+ /// - Constructor (NOT timed): sets up + ,
+ /// waits for connection, creates one long-lived /
+ /// pair on top of the streams.
+ /// - Per-iteration (timed): sender writes one message via
+ ///
+ /// + FlushAsync on the long-lived pipeWriter; receiver creates a per-message
+ /// , spawns a drain Task and a deserialize Task, awaits the deserialize result,
+ /// then cancels the drain (which calls input.Complete() in its finally) and disposes the input.
+ /// - is a no-op (full round-trip captured in );
+ /// =true → Ser ms / SerAlloc oszlopok N/A, RT ms = full round-trip.
///
///
- /// What this measures: per-message Byte[] serialize + length-prefix framing + pipe write/read syscall +
- /// kernel context switch + Byte[] deserialize. NOT measured: pipe lifecycle (one-time setup amortized over all iterations
- /// and across all test data cells, since this benchmark runs against many cells).
+ /// Why per-message tasks: the current AcBinary streaming API does not allow long-lived
+ /// reuse across multiple messages on a raw transport — see
+ /// BINARY_ISSUES.md#accore-bin-i-q4t8. This is therefore the canonical pattern, mirrored after
+ /// AcBinaryDeserializer.DeserializeFromPipeReaderAsync's internals. The Task.Run pair + per-iter
+ /// AsyncPipeReaderInput allocation are an intrinsic cost of the API today, NOT a benchmark artifact.
///
- /// Approximation note: this is a single-process loopback pipe. Real cross-process or cross-machine SignalR
- /// will add transport latency (TCP, WebSocket framing) on top of these numbers. The benchmark gives a lower bound for
- /// streaming/IPC scenarios.
+ /// Approximation note: single-process loopback NamedPipe. Real cross-process / cross-machine SignalR
+ /// adds further transport latency (TCP, WebSocket framing) on top. The benchmark gives a lower bound.
///
private sealed class AcBinaryNamedPipeBenchmark : ISerializerBenchmark, IDisposable
{
private readonly TestOrder _order;
private readonly AcBinarySerializerOptions _options;
- private readonly byte[] _serialized; // for SerializedSize reporting
+ private readonly byte[] _serialized; // for SerializedSize reporting only
- // Long-lived pipe + drain pump (set up once in ctor)
+ // Long-lived pipe lifecycle (set up once in ctor — NOT timed).
private readonly NamedPipeServerStream _pipeServer;
private readonly NamedPipeClientStream _pipeClient;
- private readonly Task _drainTask;
- private readonly System.Threading.Channels.Channel _resultChannel;
+ private readonly PipeWriter _pipeWriter;
+ private readonly PipeReader _pipeReader;
private bool _disposed;
public string Engine => EngineAcBinary;
@@ -856,76 +862,34 @@ public static class Program
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public bool IsRoundTripOnly => true; // Serialize() does the full per-message round-trip; Deserialize() is a no-op
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,length-prefixed)";
+ public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,SerializeChunked+AsyncPipeReaderInput)";
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
_order = order;
_options = options;
- // SignalR-aligned 4 KB initial buffer for the Byte[] API — matches Kestrel slab + TCP MTU,
- // simulates the realistic per-message buffer profile the SignalR transport ends up with.
- // (The 65535 default is fine for big batch encoding but over-allocates on small messages.)
+ // 4 KB chunk size for the AsyncPipeWriterOutput (raw mode — no [201][UINT16][data] framing).
+ // Aligns with Kestrel slab + TCP MTU, the realistic SignalR-style profile.
_options.BufferWriterChunkSize = 4096;
OptionsPreset = optionsPreset;
+
_serialized = AcBinarySerializer.Serialize(order, _options);
- // 1× setup — pipe persists for the lifetime of the benchmark instance.
- // Byte mode (not Message mode) — we frame messages ourselves with a 4-byte length prefix.
- // PipeOptions.Asynchronous → enables overlapped I/O on Windows; harmless on Linux/macOS.
+ // 1× setup — long-lived NamedPipe + PipeWriter/PipeReader on top of the streams.
+ // Byte mode (not Message mode) — AcBinary's chunked-stream wire format manages its own boundaries
+ // via the deserializer's structural knowledge of when an object graph ends.
var pipeName = $"AcBinaryBench-{Guid.NewGuid():N}";
+
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
_pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
- // Establish the connection. Server async-wait + client connect happen in parallel.
var serverWait = _pipeServer.WaitForConnectionAsync();
+
_pipeClient.Connect();
serverWait.GetAwaiter().GetResult();
- _resultChannel = System.Threading.Channels.Channel.CreateUnbounded();
-
- // Long-lived drain loop on the server side. Reads length-prefixed messages until the pipe is closed.
- _drainTask = Task.Run(async () =>
- {
- var lenBuf = new byte[4];
- try
- {
- while (true)
- {
- // Read 4-byte length prefix (handle short reads in a loop)
- if (!await ReadExactAsync(_pipeServer, lenBuf, 0, 4).ConfigureAwait(false))
- break;
- var len = BitConverter.ToInt32(lenBuf, 0);
- if (len <= 0) break; // sentinel / corruption guard
- var data = new byte[len];
- if (!await ReadExactAsync(_pipeServer, data, 0, len).ConfigureAwait(false))
- break;
-
- var result = AcBinaryDeserializer.Deserialize(data, _options);
- await _resultChannel.Writer.WriteAsync(result).ConfigureAwait(false);
- }
- }
- catch (Exception ex) when (ex is System.IO.IOException or ObjectDisposedException)
- {
- // pipe closed — normal teardown path
- }
- finally
- {
- _resultChannel.Writer.TryComplete();
- }
- });
- }
-
- /// Reads exactly bytes; returns false if pipe closed before completion.
- private static async Task ReadExactAsync(System.IO.Stream stream, byte[] buffer, int offset, int count)
- {
- var read = 0;
- while (read < count)
- {
- var n = await stream.ReadAsync(buffer.AsMemory(offset + read, count - read)).ConfigureAwait(false);
- if (n == 0) return false; // EOF
- read += n;
- }
- return true;
+ _pipeWriter = PipeWriter.Create(_pipeClient);
+ _pipeReader = PipeReader.Create(_pipeServer);
}
public void Warmup(int iterations)
@@ -939,22 +903,23 @@ public static class Program
[MethodImpl(MethodImplOptions.NoInlining)]
public void Serialize()
{
- // 1) Byte[] encode (same path as the IoByteArray benchmark)
- var payload = AcBinarySerializer.Serialize(_order, _options);
+ using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false);
+ using var cts = new CancellationTokenSource();
- // 2) Length-prefix framing (4 bytes little-endian) — pure benchmark-side framing, not an AcBinary feature.
- // Stack-allocated to avoid per-iter heap traffic for the prefix.
- Span lenBuf = stackalloc byte[4];
- BitConverter.TryWriteBytes(lenBuf, payload.Length);
+ // Receiver tasks must be ready BEFORE the sender flushes — otherwise the FlushAsync deadlocks
+ // waiting for someone to drain the kernel pipe buffer (NamedPipe loopback flow control).
+ var drainTask = input.DrainFromAsync(_pipeReader, cts.Token);
+ var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, _options), cts.Token);
- // 3) Sync write to the pipe — Stream.Write blocks until the OS accepts the bytes into the pipe buffer.
- _pipeClient.Write(lenBuf);
- _pipeClient.Write(payload, 0, payload.Length);
- _pipeClient.Flush();
+ AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options);
+ _pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
- // 4) Wait for the server drain loop to deserialize and post the result. Sync wait via channel reader.
- // A console app has no SynchronizationContext, so .GetAwaiter().GetResult() is deadlock-safe.
- _resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
+ _ = deserTask.GetAwaiter().GetResult();
+
+ cts.Cancel();
+ try { drainTask.GetAwaiter().GetResult(); }
+ catch (OperationCanceledException) { }
+ catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
}
[MethodImpl(MethodImplOptions.NoInlining)]
@@ -965,14 +930,23 @@ public static class Program
public bool VerifyRoundTrip()
{
- // Round-trip a single message and compare structurally.
- var payload = AcBinarySerializer.Serialize(_order, _options);
- Span lenBuf = stackalloc byte[4];
- BitConverter.TryWriteBytes(lenBuf, payload.Length);
- _pipeClient.Write(lenBuf);
- _pipeClient.Write(payload, 0, payload.Length);
- _pipeClient.Flush();
- var result = _resultChannel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
+ // Single round-trip via the same path Serialize() uses, with the deserialized graph compared.
+ using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false);
+ using var cts = new CancellationTokenSource();
+
+ var drainTask = input.DrainFromAsync(_pipeReader, cts.Token);
+ var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, _options), cts.Token);
+
+ AcBinarySerializer.SerializeChunked(_order, _pipeWriter, _options);
+ _pipeWriter.FlushAsync(cts.Token).AsTask().GetAwaiter().GetResult();
+
+ var result = deserTask.GetAwaiter().GetResult();
+ cts.Cancel();
+
+ try { drainTask.GetAwaiter().GetResult(); }
+ catch (OperationCanceledException) { }
+ catch (AggregateException ae) when (ae.InnerException is OperationCanceledException) { }
+
return result != null && DeepEqualsViaJson(_order, result);
}
@@ -980,10 +954,12 @@ public static class Program
{
if (_disposed) return;
_disposed = true;
- // Closing the client triggers EOF on the server's ReadAsync → drain loop exits gracefully.
+ // Complete the writer side first → the underlying pipe stream signals EOF, the reader sees it,
+ // any in-flight DrainFromAsync exits cleanly. Then dispose the streams.
+ try { _pipeWriter.CompleteAsync().AsTask().Wait(TimeSpan.FromSeconds(2)); } catch { /* swallow on teardown */ }
+ try { _pipeReader.Complete(); } catch { /* swallow on teardown */ }
try { _pipeClient.Dispose(); } catch { /* swallow on teardown */ }
try { _pipeServer.Dispose(); } catch { /* swallow on teardown */ }
- try { _drainTask.Wait(TimeSpan.FromSeconds(5)); } catch { /* swallow on teardown */ }
}
}
@@ -1178,7 +1154,7 @@ public static class Program
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
ReferenceHandler = System.Text.Json.Serialization.ReferenceHandler.IgnoreCycles
};
- _serialized = System.Text.Json.JsonSerializer.Serialize(order, _options);
+ _serialized = JsonSerializer.Serialize(order, _options);
_serializedUtf8 = Utf8NoBom.GetBytes(_serialized);
}
@@ -1192,15 +1168,15 @@ public static class Program
}
[MethodImpl(MethodImplOptions.NoInlining)]
- public void Serialize() => System.Text.Json.JsonSerializer.Serialize(_order, _options);
+ public void Serialize() => JsonSerializer.Serialize(_order, _options);
[MethodImpl(MethodImplOptions.NoInlining)]
- public void Deserialize() => System.Text.Json.JsonSerializer.Deserialize(_serialized, _options);
+ public void Deserialize() => JsonSerializer.Deserialize(_serialized, _options);
public bool VerifyRoundTrip()
{
- var json = System.Text.Json.JsonSerializer.Serialize(_order, _options);
- var roundTripped = System.Text.Json.JsonSerializer.Deserialize(json, _options);
+ var json = JsonSerializer.Serialize(_order, _options);
+ var roundTripped = JsonSerializer.Deserialize(json, _options);
return DeepEqualsViaJson(_order, roundTripped);
}
}
diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs
index 4543bd5..ef60d10 100644
--- a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs
+++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs
@@ -12,7 +12,7 @@ namespace AyCode.Core.Tests.Serialization;
/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect
/// sanity pinning (Step 4).
///
-/// Tests run with 's default stripChunkFraming = true —
+/// Tests run with 's default multiMessage = true —
/// expects the AsyncSegment chunked wire format
/// [201][UINT16 LE size][data] per chunk, tolerates [200] CHUNK_START prefix, and
/// signals end-of-stream on [202] CHUNK_END. The helper
@@ -260,21 +260,68 @@ public class AcBinarySerializerPipeParallelTests
}
[TestMethod]
- public void Feed_ChunkEndMarker_SignalsCompletion()
+ public void Feed_ChunkEndMarker_AutoResetsForNextMessage()
{
- // [202] CHUNK_END alone (without external Complete()) should signal end-of-stream.
+ // [202] CHUNK_END is end-of-MESSAGE, NOT end-of-session. The input auto-resets so the same
+ // long-lived instance can deserialize the next message on the same stream — see
+ // BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix. Session end is signalled separately by
+ // an external Complete() call (or stream-EOF on the underlying transport).
+ using var input = new AsyncPipeReaderInput(64);
+
+ // Message 1
+ input.Feed(WrapInChunkFrame([1, 2, 3]));
+ input.Feed([202]); // CHUNK_END marker — auto-reset, NOT completion
+
+ // First message is consumable
+ input.Initialize(out var buf1, out var pos1, out var bufLen1);
+ Assert.AreEqual(3, bufLen1);
+ Assert.AreEqual(1, buf1[0]);
+ Assert.AreEqual(2, buf1[1]);
+ Assert.AreEqual(3, buf1[2]);
+
+ // Consume the bytes (simulate deserializer): reports position = 3 to producer via TryAdvanceSegment.
+ // Consumer NOT yet at end-of-session, so this should NOT immediately return false — but since the
+ // [202] reset _readPos to _writePos (= 3), the next AppendToBuffer for message 2 will recycle to 0.
+
+ // Message 2 — same long-lived input, just keeps going
+ input.Feed(WrapInChunkFrame([10, 20, 30, 40]));
+ input.Feed([202]);
+
+ // Re-initialize for the next deserializer call — the buffer was recycled to 0 by the
+ // sliding-window cycling triggered when AppendToBuffer saw _readPos == _writePos > 0.
+ input.Initialize(out var buf2, out var pos2, out var bufLen2);
+ Assert.AreEqual(4, bufLen2);
+ Assert.AreEqual(10, buf2[0]);
+ Assert.AreEqual(20, buf2[1]);
+ Assert.AreEqual(30, buf2[2]);
+ Assert.AreEqual(40, buf2[3]);
+
+ // Now signal end-of-session explicitly
+ input.Complete();
+
+ // After Complete, TryAdvanceSegment returns false on empty — session truly ended
+ var pos3 = bufLen2;
+ var bufLen3 = bufLen2;
+ var buf3 = buf2;
+ var hasMore = input.TryAdvanceSegment(ref buf3, ref pos3, ref bufLen3, 1);
+ Assert.IsFalse(hasMore);
+ }
+
+ [TestMethod]
+ public void Feed_ExternalComplete_SignalsEndOfSession()
+ {
+ // Explicit Complete() (or stream-EOF in the DrainFromAsync path) is the session-end signal —
+ // distinct from per-message [202] markers which only auto-reset for the next message.
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
- input.Feed([202]); // CHUNK_END marker only — no external Complete()
+ input.Complete(); // external session-end
- // Should observe completion: TryAdvanceSegment returns false on empty after consume
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(3, bufferLength);
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
-
Assert.IsFalse(hasMore);
}
@@ -431,7 +478,7 @@ public class AcBinarySerializerPipeParallelTests
{
// SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire.
// AsyncPipeReaderInput.Feed strips framing internally on the receive side
- // (default stripChunkFraming = true).
+ // (default multiMessage = true).
AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts);
}
finally
diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
index fe9f8fb..67eb220 100644
--- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
+++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs
@@ -350,11 +350,11 @@ public static partial class AcBinaryDeserializer
var opts = options ?? AcBinarySerializerOptions.Default;
- // Raw mode (stripChunkFraming: false) — bytes drained from the PipeReader are forwarded
+ // Single-message mode (multiMessage: false) — bytes drained from the PipeReader are forwarded
// verbatim to the deserialization buffer. Pair with AcBinarySerializer.SerializeChunked
- // (raw byte stream) on the producer side; for chunked-framed wire formats the parser
+ // (raw byte stream) on the producer side; for multi-message framed wire formats the parser
// strips framing upstream and feeds only data bytes here.
- using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, stripChunkFraming: false);
+ using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, multiMessage: false);
var deserTask = Task.Run(() => Deserialize(input, opts), ct);
await input.DrainFromAsync(reader, ct).ConfigureAwait(false);
diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
index f635f95..92a149d 100644
--- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
+++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
@@ -452,7 +452,7 @@ public static partial class AcBinarySerializer
public static int SerializeChunked(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (pipe is null) throw new ArgumentNullException(nameof(pipe));
- return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: false);
+ return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, multiMessage: false);
}
///
@@ -479,7 +479,7 @@ public static partial class AcBinarySerializer
/// Serializer options (type wrappers, reference handling, interning, etc.).
/// Total serialized bytes written.
public static int SerializeChunked(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options)
- => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: false);
+ => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, multiMessage: false);
///
/// Serialize a value into a chunked stream where each chunk carries a self-describing
@@ -511,7 +511,7 @@ public static partial class AcBinarySerializer
public static int SerializeChunkedFramed(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (pipe is null) throw new ArgumentNullException(nameof(pipe));
- return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: true);
+ return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, multiMessage: true);
}
///
@@ -524,7 +524,7 @@ public static partial class AcBinarySerializer
/// .
///
public static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options)
- => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: true);
+ => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, multiMessage: true);
///
/// Internal flush-tunable framed PipeWriter overload — used by AyCode.Services
@@ -533,7 +533,7 @@ public static partial class AcBinarySerializer
/// on a guaranteed parallel-capable writer.
///
internal static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout)
- => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true);
+ => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, multiMessage: true);
///
/// Internal legacy alias for
@@ -542,14 +542,14 @@ public static partial class AcBinarySerializer
/// (framed wire format with [201][UINT16][data] per chunk + [202] end marker).
///
internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout)
- => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true);
+ => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, multiMessage: true);
///
/// Common pipe-output serialization core. Same loop for both raw ()
/// and framed () modes — the only difference flows through
- /// into the ctor.
+ /// into the ctor.
///
- private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool emitChunkFraming)
+ private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool multiMessage)
{
if (value == null)
{
@@ -563,7 +563,7 @@ public static partial class AcBinarySerializer
var runtimeType = value.GetType();
var context = BinarySerializationContextPool.Get(options);
- context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, emitChunkFraming, waitForFlush, flushTimeout);
+ context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage, waitForFlush, flushTimeout);
context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd);
try
diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
index 7245688..36bcb81 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs
@@ -15,11 +15,14 @@ namespace AyCode.Core.Serializers.Binaries;
/// .NET BCL convention for type-level Async prefix (AsyncEnumerable,
/// IAsyncDisposable, AsyncLocal<T>, ...).
///
-/// behavior is driven by the stripChunkFraming ctor flag:
+/// behavior is driven by the multiMessage ctor flag:
/// true (default) — parses [201][UINT16][data] chunked frames + [202] end
/// marker (matches AsyncPipeWriterOutput framed output and SignalR's AsyncSegment wire
-/// format); false — appends bytes verbatim (matches AcBinarySerializer.SerializeChunked
-/// raw output drained from a ).
+/// format); on every [202] the input auto-resets for the next message — multiple
+///
+/// calls can reuse the same long-lived input over a single transport. false — appends bytes
+/// verbatim (matches AcBinarySerializer.SerializeChunked raw output drained from a
+/// ); single-message scenario, no auto-reset.
///
/// Usage modes:
///
@@ -60,18 +63,22 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
private int _readPos; // consumer reports consumed position here
private bool _completed;
- // Whether Feed() should strip [201][UINT16][data] chunked framing (true, default — matches
- // SignalR-style multiplexed wire) or append bytes verbatim (false — matches the raw output
- // of SerializeChunked / single-shot byte[] over PipeWriter).
- private readonly bool _stripChunkFraming;
+ // multi-message wire framing flag:
+ // true (default): Feed() parses [201][UINT16][data] chunked framing + [202] CHUNK_END markers,
+ // auto-resets the buffer cursor on every [202] for the next message.
+ // Matches AsyncPipeWriterOutput multi-message wire and SignalR AsyncSegment.
+ // false: Feed() appends bytes verbatim (no wire-format interpretation, single message
+ // scenario). Matches AcBinarySerializer.SerializeChunked raw output drained
+ // from a PipeReader.
+ private readonly bool _multiMessage;
// Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END.
- // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's framed
- // output and SignalR's AsyncSegment chunked frame format. Only active when
- // _stripChunkFraming = true.
+ // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's
+ // multi-message output and SignalR's AsyncSegment chunked frame format. Only active when
+ // _multiMessage = true.
private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped
private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data]
- private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-stream
+ private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-MESSAGE (auto-reset for next message)
private FramingState _framingState = FramingState.AwaitingHeader;
@@ -84,7 +91,8 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
AwaitingSizeLow, // have [201], expect UINT16 LE low byte
AwaitingSizeHigh, // have low, expect UINT16 LE high byte
AwaitingData, // expect _bytesRemainingInChunk data bytes
- Done // saw [202], ignore further bytes
+ // No "Done" state — [202] auto-resets to AwaitingHeader for next-message reuse.
+ // Session-end is signalled by external Complete() / stream-EOF, NOT by framing-state.
}
private readonly ManualResetEventSlim _dataAvailable;
@@ -116,20 +124,26 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// 4 KB chunk size, 128 KB for the standalone 64 KB default).
///
/// Initial buffer size. Rounded up by ArrayPool.
- ///
- /// true (default): parses [201][UINT16][data] chunked frames +
- /// [202] end marker (matches framed output and
- /// SignalR's AsyncSegment chunked wire format).
- /// false: appends bytes verbatim — for raw byte streams (matches
- /// AcBinarySerializer.SerializeChunked output and the single-shot
- /// byte[] output).
+ ///
+ /// true (default): parses the multi-message wire framing
+ /// ([201][UINT16][data] chunks + [202] end-of-MESSAGE marker — matches
+ /// multi-message output and SignalR's AsyncSegment).
+ /// On every [202] the input auto-resets the buffer cursor for the next message —
+ /// the same long-lived input can be reused across many
+ ///
+ /// calls without allocating a fresh instance per message. End of session is signalled by an
+ /// external call or stream-EOF, NOT by [202].
+ ///
+ /// false: appends bytes verbatim — single-message scenario where the
+ /// stream lifecycle equals the message lifecycle (matches AcBinarySerializer.SerializeChunked
+ /// raw output, paired with pipeWriter.CompleteAsync() as the end-of-message signal).
///
- public AsyncPipeReaderInput(int initialCapacity, bool stripChunkFraming = true)
+ public AsyncPipeReaderInput(int initialCapacity, bool multiMessage = true)
{
if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity));
_buffer = ArrayPool.Shared.Rent(initialCapacity);
- _stripChunkFraming = stripChunkFraming;
+ _multiMessage = multiMessage;
_dataAvailable = new ManualResetEventSlim(false);
}
@@ -137,42 +151,42 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
///
/// Feeds bytes into the consumer-visible buffer. Behavior is driven by the
- /// stripChunkFraming ctor flag:
+ /// multiMessage ctor flag:
///
- /// - stripChunkFraming = true (default): expects the chunked wire format
- /// [201][UINT16 LE size][data] per chunk, tolerates [200]
- /// CHUNK_START prefix, and signals end-of-stream on [202] CHUNK_END. State
- /// is preserved across Feed calls — partial frame headers, mid-size boundaries,
- /// and mid-data boundaries all resume correctly. On [202], sets the completion
- /// flag and signals waiting consumers — equivalent to an external
- /// call. Bytes after [202] are ignored.
- /// - stripChunkFraming = false: appends bytes verbatim — no wire-format
- /// interpretation. The producer must pass only payload bytes (e.g. raw byte stream
- /// drained from a paired with
- /// AcBinarySerializer.SerializeChunked).
+ /// - multiMessage = true (default): expects the multi-message wire format
+ /// [201][UINT16 LE size][data] per chunk, tolerates [200] CHUNK_START
+ /// prefix, treats [202] CHUNK_END as end-of-MESSAGE. State is preserved
+ /// across Feed calls — partial frame headers, mid-size boundaries, and mid-data
+ /// boundaries all resume correctly. On [202], the input auto-resets the
+ /// buffer cursor for the next message (signals the producer's sliding-window cycling
+ /// to recycle the buffer on next ) and resets the framing
+ /// state machine to AwaitingHeader — the next bytes are expected to be a new
+ /// [201]... frame. End-of-session is NOT signalled by [202]; only an
+ /// external call or stream-EOF marks the session as ended.
+ /// - multiMessage = false: appends bytes verbatim — no wire-format interpretation.
+ /// The producer passes only payload bytes (e.g. raw byte stream drained from a
+ /// paired with
+ /// AcBinarySerializer.SerializeChunked). Single-message scenario; end-of-message
+ /// is the same as end-of-stream, signalled by external call.
///
///
public void Feed(ReadOnlySpan data)
{
if (data.IsEmpty) return;
- if (!_stripChunkFraming)
+ if (!_multiMessage)
{
- // Raw mode: append verbatim, no framing interpretation.
+ // Single-message mode: append verbatim, no framing interpretation.
AppendToBuffer(data);
return;
}
- // Framed mode: state machine parses [201][UINT16 LE size][data] frames + [202] end marker.
+ // Multi-message mode: state machine parses [201][UINT16 LE size][data] frames + [202] end-of-message marker.
var i = 0;
while (i < data.Length)
{
switch (_framingState)
{
- case FramingState.Done:
- EmitDiagnostic($"Feed: bytes after CHUNK_END ignored, count={data.Length - i}");
- return;
-
case FramingState.AwaitingHeader:
{
var marker = data[i++];
@@ -187,11 +201,18 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
}
else if (marker == ChunkEnd)
{
- EmitDiagnostic("Feed: CHUNK_END [202] received, signaling completion");
- _framingState = FramingState.Done;
- Volatile.Write(ref _completed, true);
- _dataAvailable.Set();
- return;
+ // [202] = end of CURRENT message (NOT end of session). Two-step signal:
+ // (a) reset framing state machine to AwaitingHeader for the next [201] header,
+ // (b) write _readPos = -1 sentinel — picked up by the next AppendToBuffer's
+ // sliding-window cycling, which resets the buffer to 0 for the new message.
+ // _completed stays false — only external Complete() / stream-EOF marks session end.
+ // The sentinel is wire-format intrinsic: TryAdvanceSegment / Initialize handle
+ // _readPos < 0 defensively (treat as "fully consumed"), so the consumer never
+ // observes the sentinel directly — by the time the consumer reaches the next
+ // Initialize call, AppendToBuffer has already cycled _readPos back to 0.
+ EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset, _readPos sentinel armed");
+ _framingState = FramingState.AwaitingHeader;
+ Volatile.Write(ref _readPos, -1);
}
else
{
@@ -241,13 +262,19 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
///
/// Appends data bytes to the internal buffer with sliding-window cycling
- /// (reset to 0 when consumer has caught up) and grow-as-last-resort. Signals the consumer.
+ /// (reset to 0 when consumer has caught up OR a [202] message-end sentinel was raised) and
+ /// grow-as-last-resort. Signals the consumer.
///
private void AppendToBuffer(ReadOnlySpan data)
{
- // If consumer consumed everything → reset positions to 0 (sliding-window cycling)
+ // Cycle the buffer to 0 if either:
+ // (a) consumer has caught up to _writePos (classic sliding-window pattern), OR
+ // (b) a [202] CHUNK_END marker was just parsed and armed _readPos = -1 (sentinel) —
+ // the message is complete on the wire, the consumer (per wire-format guarantee)
+ // has read or will read exactly _writePos bytes; the next bytes are the start of
+ // a new message and belong at offset 0.
var rp = Volatile.Read(ref _readPos);
- if (rp > 0 && rp == _writePos)
+ if (rp < 0 || (rp > 0 && rp == _writePos))
{
EmitDiagnostic($"AppendToBuffer reset positions rp={rp} wp={_writePos} → 0");
_writePos = 0;
@@ -314,8 +341,14 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
{
EmitDiagnostic($"TryAdvanceSegment enter position={position} bufferLength={bufferLength} needed={needed}");
- // Report how far we've consumed — enables producer to reset positions to 0
- Volatile.Write(ref _readPos, position);
+ // Report how far we've consumed — enables producer to reset positions to 0.
+ // Sentinel respect: if _readPos < 0 (a [202] CHUNK_END marker armed it), DO NOT overwrite
+ // the sentinel — the next AppendToBuffer needs to see it to cycle the buffer to 0.
+ // The local sentinel-defence below ensures correct logic during the transient race window.
+ if (Volatile.Read(ref _readPos) >= 0)
+ {
+ Volatile.Write(ref _readPos, position);
+ }
while (true)
{
@@ -323,6 +356,11 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
int rp = Volatile.Read(ref _readPos);
int wp = Volatile.Read(ref _writePos);
+ // Sentinel defence: if [202] armed _readPos = -1 while we were reading, treat the
+ // sentinel as "use our local position" — the cycle hasn't fired yet (no AppendToBuffer
+ // has run since [202]); we still consume from our own position into the existing buffer.
+ if (rp < 0) rp = position;
+
if (wp - rp >= needed)
{
buffer = _buffer; // may be new array after grow
@@ -354,6 +392,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
_dataAvailable.Reset();
rp = Volatile.Read(ref _readPos);
+ if (rp < 0) rp = position; // sentinel defence (same as the top of the loop)
wp = Volatile.Read(ref _writePos);
if (wp - rp >= needed || Volatile.Read(ref _completed)) continue;
diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
index eb1e523..f48f078 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
@@ -9,23 +9,25 @@ namespace AyCode.Core.Serializers.Binaries;
///
/// Binary output that writes to a PipeWriter chunk-by-chunk via the PipeWriter's natural slabbing.
-/// Two wire-format modes are supported, selected by the emitChunkFraming ctor flag:
+/// Two wire-format modes are supported, selected by the multiMessage ctor flag:
///
///
-/// - emitChunkFraming = false (raw): pure AcBinary bytes are written into
+///
- multiMessage = false (single-message): pure AcBinary bytes are written into
/// the PipeWriter's slabs and committed via Advance — no per-chunk header bytes appear on the
/// wire. Bit-compatible with the single-shot Serialize(value, opts) → byte[] output.
/// The receiver can deserialize the byte stream as-is (e.g. via
/// AcBinaryDeserializer.Deserialize(byte[]) after collecting, or any raw
-/// PipeReader-based path).
+/// PipeReader-based path). End-of-message = end-of-stream; caller signals it by closing
+/// the writer (pipeWriter.CompleteAsync()).
///
-/// - emitChunkFraming = true (framed): each chunk gets a 3-byte header
+///
- multiMessage = true (default): each chunk gets a 3-byte header
/// [201][UINT16 size][data]. The header is reserved at the start of each acquired slab;
/// the serializer writes data after it, and on commit the size is patched and the full chunk
-/// is Advanced (zero-copy). The protocol layer writes a single [202] byte after all
-/// chunks to signal end-of-stream. This is the multiplexed wire format used by SignalR's
-/// BinaryProtocolMode.AsyncSegment and any custom multiplexed protocol where the
-/// receiver needs incremental chunk-boundary detection.
+/// is Advanced (zero-copy). At end-of-serialize, writes a [202]
+/// CHUNK_END marker (symmetrical with [201]) — this signals end-of-MESSAGE to the
+/// receiver, which auto-resets its for the next message
+/// on the same long-lived stream. Used by SignalR's BinaryProtocolMode.AsyncSegment
+/// and any custom multi-message protocol over a long-lived transport.
///
///
/// Backpressure modes (controlled by waitForFlush) — independent of framing:
@@ -58,6 +60,9 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// MsgAsyncChunkData type marker (201).
private const byte ChunkDataMarker = 201;
+ /// MsgAsyncChunkEnd marker (202) — written at end-of-serialize in framed mode.
+ private const byte ChunkEndMarker = 202;
+
/// Header size: 1 byte type + 2 bytes UINT16 size.
private const int HeaderSize = 3;
@@ -79,7 +84,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize;
- private readonly bool _emitChunkFraming;
+ private readonly bool _multiMessage;
private readonly bool _waitForFlush;
private readonly bool _serializeFlushAndAcquire;
private readonly TimeSpan _flushTimeout;
@@ -106,22 +111,24 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
[Conditional("DEBUG")]
private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message);
- /// Creates an output bound to the given PipeWriter — framed or raw mode per .
+ /// Creates an output bound to the given PipeWriter — multi-message or single-message mode per .
/// Target pipe (typically Kestrel's transport output for SignalR, NamedPipe, FileStream, or any custom ).
/// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size.
- /// true → write [201][UINT16][data] per-chunk header (multiplexed wire format).
- /// false → raw AcBinary bytes only, byte-compatible with the single-shot byte[] output. See class summary.
+ /// true (default) → multi-message wire format: [201][UINT16][data] per-chunk header
+ /// + [202] end-of-message marker on Flush. Receiver auto-resets between messages.
+ /// false → single-message: raw AcBinary bytes only, byte-compatible with the single-shot byte[] output;
+ /// caller signals end-of-message by closing the writer. See class summary.
/// See class summary — pipeline parallelism (true) vs adaptive (false).
/// Per-flush timeout. null →
/// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers.
- public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool emitChunkFraming = true, bool waitForFlush = true, TimeSpan? flushTimeout = null)
+ public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool multiMessage = true, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (chunkSize > MaxChunkSize)
throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max).");
_pipeWriter = pipeWriter;
_chunkSize = chunkSize;
- _emitChunkFraming = emitChunkFraming;
+ _multiMessage = multiMessage;
_waitForFlush = waitForFlush;
// null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms).
@@ -230,10 +237,25 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart);
///
- /// Commits the last (partial) chunk to the PipeWriter — in framed mode patches the
- /// [201][UINT16 size] header before Advance, in raw mode simply Advances the data.
- /// Zero-copy: no data copying. Does NOT flush to network — in framed mode the protocol writes
- /// [202] and flushes after.
+ /// Commits the last (partial) chunk to the PipeWriter, writes the [202] CHUNK_END marker
+ /// in framed mode, and flushes everything to the underlying writer. End-of-serialize is fully
+ /// owned by this output: when returns, every byte is downstream — the
+ /// caller does NOT need a follow-up pipeWriter.FlushAsync().
+ ///
+ /// Behaviour by mode:
+ ///
+ /// - Raw mode (emitChunkFraming = false): final chunk Advance →
+ /// FlushAsync. No end marker (raw mode has no framing concept).
+ /// - Framed mode (emitChunkFraming = true): final chunk Advance →
+ /// [202] CHUNK_END marker (symmetric with the [201] header that
+ /// writes per chunk) → FlushAsync. The end
+ /// marker is written here so the wire-format contract is fully owned by this output;
+ /// protocol layers above (e.g. AcBinaryHubProtocol) no longer need to inject
+ /// their own [202] + flush.
+ ///
+ ///
+ /// Zero-copy: no data copying in either mode. The pre-flush wait covers any in-flight
+ /// fire-and-forget flush from on the Pipe-based parallel path.
///
public void Flush(byte[] buffer, int position)
{
@@ -242,6 +264,21 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
CommitCurrentChunk(buffer, position);
+ // Framed mode: write the [202] CHUNK_END marker — symmetric with [201] header that
+ // CommitCurrentChunk writes per chunk. Both ends of the framing contract are owned here.
+ if (_multiMessage)
+ {
+ var span = _pipeWriter.GetSpan(1);
+ span[0] = ChunkEndMarker;
+ _pipeWriter.Advance(1);
+
+ EmitDiagnostic("Flush[framed]: wrote [202] CHUNK_END");
+ }
+
+ // Always flush — final chunk (and end marker, if framed) → downstream. Caller does not
+ // need a follow-up FlushAsync.
+ SyncAwaitFlush(_pipeWriter.FlushAsync());
+
// End of serialize lifecycle — return the owned fallback buffer to ArrayPool exactly
// once (NOT per chunk). The buffer was reused across all chunks in this lifecycle;
// releasing it now avoids per-chunk rent/return churn even when the fallback path
@@ -285,7 +322,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
var dataBytes = position - _currentChunkStart;
if (dataBytes <= 0) return;
- if (_emitChunkFraming)
+ if (_multiMessage)
{
var headerStart = _currentChunkStart - HeaderSize;
buffer[headerStart] = ChunkDataMarker;
@@ -329,11 +366,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
// Header reservation only in framed mode — raw mode skips it for byte-compat with the
// single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers).
- var headerOffset = _emitChunkFraming ? HeaderSize : 0;
+ var headerOffset = _multiMessage ? HeaderSize : 0;
var totalRequest = dataSize + headerOffset;
var memory = _pipeWriter.GetMemory(totalRequest);
- EmitDiagnostic($"AcquireChunk: framed={_emitChunkFraming} requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}");
+ EmitDiagnostic($"AcquireChunk: framed={_multiMessage} requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}");
if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null)
{
diff --git a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
index dbdf005..d662996 100644
--- a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
+++ b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
@@ -103,6 +103,75 @@ Same `TryGetArray` fallback as `BufferWriterBinaryOutput` (ACCORE-BIN-I-K8R4). K
Same constraint as ACCORE-BIN-I-P3M6 — `IBinaryInputBase` interface is synchronous. `ReadAsync().GetAwaiter().GetResult()` blocks when waiting for more data from the pipe. Currently not used in production (SignalR delivers complete messages via `TryParseMessage`). Reserved for future direct-pipe deserialization scenarios.
+## Wire format / Multi-message protocol limits
+
+The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` with `multiMessage = true`) is designed for **a single sequential producer + a single sequential consumer** sharing one long-lived stream. The format is `[201][UINT16 size][data]...[202]` per message, with no stream-id, no message-id, and no out-of-band recovery primitive. Real-world deployments may need patterns the format does not natively support — these limits are not bugs but **architectural decisions documented as constraints**.
+
+### ACCORE-BIN-I-M9P3: No native chunk-multiplexing for concurrent writers
+
+**Status:** Open (intentional limit)
+**Affects:** `AsyncPipeWriterOutput` (multi-message wire format `[201][UINT16][data]...[202]`)
+**Reach:** any deployment that wants multiple producers (or multiple logical streams) to interleave on a single long-lived `PipeWriter`.
+
+**Symptom:** If two threads concurrently call `AcBinarySerializer.SerializeChunkedFramed(..., sharedPipeWriter, ...)` on the same `PipeWriter` without external synchronization, their `[201][UINT16][data]` chunks interleave on the wire. The receiver's `AsyncPipeReaderInput` framing-state-machine has no way to demultiplex — the chunks all flow into one `_buffer`, and the next `Deserialize` reads a corrupted blend of both messages.
+
+**Root cause:** The wire format has no **stream-id** field in the chunk header. `[201][UINT16 size][data]` only encodes the size, not which logical stream the chunk belongs to. A receiver assumes a strictly sequential producer; concurrent producers must serialize externally (e.g. a `SemaphoreSlim` around `SerializeChunkedFramed`).
+
+**Why intentional:** Adding a stream-id would widen the chunk header (extra UINT8/UINT16 per chunk) — measurable wire overhead for the typical case (single producer / sequential RPC pipeline / SignalR-style hub message). The cost-benefit favours keeping the header lean and pushing concurrency to the application layer (one writer per logical stream).
+
+**Workarounds:**
+- **External lock around the writer**: `lock` / `SemaphoreSlim` ensuring at most one in-flight `SerializeChunkedFramed` call per `PipeWriter`. Producer-side concern.
+- **One PipeWriter per logical stream**: each stream gets its own connection / pipe instance. Best fit for transport-level multiplexing (e.g. HTTP/2 streams, multiple WebSocket frames).
+- **Out-of-band protocol layer**: build a stream-id wrapper above `SerializeChunkedFramed` (`[stream-id-byte][serialized-bytes]`); the receiver dispatches to per-stream `AsyncPipeReaderInput` instances. Possible but currently no built-in support.
+
+**Cross-references:**
+- Wire format definition: `AsyncPipeWriterOutput.cs` (`ChunkDataMarker = 201`, `ChunkEndMarker = 202`)
+- The `[200]` CHUNK_START marker is reserved/tolerated but currently has no semantics — could host a future stream-id extension if needed (out-of-scope today)
+
+### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream
+
+**Status:** Open (intentional limit)
+**Affects:** `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options)`
+**Reach:** any consumer that wants a long-lived `AsyncPipeReaderInput` to receive **mixed-type messages** (e.g. `Request` then `Response` then `Heartbeat` on the same connection — the typical RPC/Hub pattern).
+
+**Symptom:** The `Deserialize` overload is parameterized on a single `T`. If consecutive messages on the same long-lived input are different types, the consumer has no way (within the AcBinary API surface) to know which `T` to pass for the next call.
+
+**Root cause:** AcBinary's wire format does **not** include a type-discriminator before the payload. The serializer writes the object graph directly, and the deserializer must know the target type up-front. This is by design — the format is optimized for size, not for self-description.
+
+**Why intentional:** Type discriminators (4-byte hash, length-prefixed type-name string, etc.) cost wire bytes per message and require shared registries between producer and consumer. The framework keeps these concerns out of the AcBinary core and pushes them to the **dispatch layer** above (where they can be application-tuned: short tags, hash maps, type-id enums).
+
+**Workarounds:**
+- **Tag-based dispatch above AcBinary**: prefix each message with an `int` (or enum) tag the consumer reads first to choose `Deserialize`. This is exactly how `AcBinaryHubProtocol` already does it: `SignalRCrudTags`-style integer tags identify the message class on the wire.
+- **Polymorphic envelope type**: define a single `T = Envelope` containing a discriminator field + raw payload bytes; the consumer deserializes the envelope, switches on the discriminator, and re-deserializes the payload as the concrete type from the inner `byte[]`. Adds a small layer of indirection but works on top of fix-T `Deserialize`.
+- **One input per type-stream**: separate streams per message-class. Practical when the type-set is small and the transport can afford multiple connections.
+
+**Cross-references:**
+- `AcBinaryHubProtocol` uses tag-based dispatch — canonical real-world pattern (see `AyCode.Services/SignalRs/`).
+- `SignalRCrudTags`, `AcSignalRDataSource` — examples of tag-driven message classification.
+
+### ACCORE-BIN-I-Z2X9: Wire-format has no built-in cancel/timeout recovery
+
+**Status:** Open (intentional limit)
+**Affects:** `AsyncPipeReaderInput` framing state machine (multi-message mode)
+**Reach:** any deployment that wants to recover a long-lived stream from a partial/aborted message (sender crash mid-message, network timeout mid-chunk, application cancellation mid-`SerializeChunkedFramed`).
+
+**Symptom:** If a sender starts writing a multi-chunk message — `[201][UINT16=N][partial-data-of-K-bytes-where-K` (or `Dictionary` if also moving to integer tags) populated at server startup with only the application's known DTO types. Reject any `typeName` not in the whitelist. The registry can be auto-populated from `[AcBinarySerializable]`-marked types in registered assemblies if that's an acceptable trust boundary, or maintained by hand for stricter control.
+
+2. **Tag-based dispatch (longer-term)** — replace the wire-format `AssemblyQualifiedName` (variable-length string, ~50-150 bytes) with a fixed-width type-tag (`int` or `varint`, 1-5 bytes). Sender and receiver share a `Dictionary` registry. Smaller wire footprint **and** safer (the tag space is application-defined, not BCL-reachable). Breaking change to the wire format — coordinate with `ACCORE-SBP-I-F6T2` and ongoing protocol evolution.
+
+3. **Type-hash dispatch (compromise)** — keep variable-length lookup but use `FNV-1a(typeName)` as the wire payload (4 bytes) and validate against a server registry. Doesn't eliminate the registry but reduces wire overhead vs. full string.
+
+4. **Authenticate-then-trust deferral** — until a real fix lands, audit the deployment for: (a) authenticated-only Hub access, (b) NO dangerous types reachable from the server's loaded assemblies (audit BCL types in particular). This is **not** a security mitigation, it's a "reduce blast radius" stopgap.
+
+### Related TODO
+
+(To be opened once a fix direction is chosen — likely whitelist-based first, tag-based as a follow-up wire-format evolution.)
diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md
index 87b6e5c..0c59995 100644
--- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md
+++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md
@@ -196,3 +196,36 @@ In `AsyncSegment` mode the total message size is unknown until `CHUNK_END`, so a
### Acceptance criteria
- API sketch.
- Stack-with-encryption order decision documented (industry standard: **encrypt-then-MAC**, but evaluate trade-offs).
+
+## ACCORE-SBP-T-S2H6: Replace `Type.GetType(wire-supplied-name)` with whitelist-based type resolution
+**Priority:** P0 — security blocker · **Type:** Security fix · **Related Issue:** [`SIGNALR_BINARY_PROTOCOL_ISSUES.md#accore-sbp-i-r8k3`](SIGNALR_BINARY_PROTOCOL_ISSUES.md#accore-sbp-i-r8k3-typegettypewire-supplied-name-enables-deserialization-gadget-attack) (full threat model, attack surface, and mitigation analysis there — **do not duplicate here**)
+
+Replace the unsafe `Type.GetType(typeName)` lookup in `AyCodeBinaryHubProtocol`'s argument-binder header parsing with a registry-validated lookup. The deserialization-gadget attack surface this fix closes, the rationale for why the type-resolution mechanism is needed at all, and the four mitigation directions are covered in the issue — **this TODO tracks the chosen implementation path only**.
+
+### Implementation steps
+
+1. **Pick a mitigation direction** (the issue lists four: whitelist, tag-based, type-hash, audit-only stopgap). Recommended starting point: **whitelist registry**, because it can ship without a wire-format change and immediately closes the vulnerability for typical deployments. Tag-based dispatch can land as a follow-up wire-format evolution (separate TODO once direction is committed).
+
+2. **Registry surface design** — likely on `AyCodeBinaryHubProtocolOptions`:
+ - `Action? ConfigureTypeWhitelist { get; init; }` — fluent builder for explicit type registration
+ - Auto-population path: `RegisterAcBinarySerializableTypesFrom(Assembly...)` — scans for `[AcBinarySerializable]`-marked types from declared assemblies (acceptable trust boundary for many apps; ALSO opt-in)
+ - Defaulting: if no whitelist configured, the protocol either (a) refuses to start with a clear error message, or (b) uses an empty whitelist (rejecting all `Type.GetType` lookups). Decide based on backward-compat appetite.
+
+3. **Replace the call site** (`AyCodeBinaryHubProtocol.cs` ~line 114):
+ - Before: `resolvedType = Type.GetType(typeName);`
+ - After: `resolvedType = _typeWhitelist.TryResolve(typeName);` — returns `null` if not registered → propagate as protocol error (reject the message, log with the offending type-name for security audit).
+
+4. **Test plan**:
+ - Unit: whitelist-registered type resolves; non-registered type returns `null` and the protocol throws / logs.
+ - Security regression: pass `System.Diagnostics.Process`, `System.IO.File`-related descriptor types — all rejected.
+ - Backward compat: existing benchmarks / integration tests continue to pass with their DTOs registered.
+ - Versioning: a registered DTO whose assembly version changed (NuGet upgrade) still resolves, because the whitelist is registry-keyed (not assembly-version-keyed) — verify or design accordingly.
+
+### Acceptance criteria
+- The vulnerability described in `R8K3` is no longer reachable: a wire-supplied non-whitelisted type-name results in a protocol error, NOT a `Type.GetType` BCL lookup.
+- The `R8K3` issue is updated with `Status: Fixed` (or `Mitigated` if a chosen direction only partially closes it).
+- A subsequent TODO is opened for tag-based dispatch (wire-format evolution) IF that direction is also pursued — this TODO covers the immediate whitelist-fix only.
+
+### Out of scope (deliberately)
+- Wire-format changes (tag-based / type-hash dispatch). Those are a separate TODO once direction is committed and a wire-format-evolution PR plan exists.
+- Audit of every Hub-method `dataArg : object` use site in consumer projects. The fix is at the protocol level — consumers register their DTOs and the protocol enforces.