[LOADED_DOCS: 2 files, no new loads]

Separate raw and framed streaming in AcBinarySerializer

Refactored AcBinarySerializer and AsyncPipeWriterOutput to support both raw (headerless) and multiplexed/framed ([201][UINT16][data]) streaming wire formats, controlled by a new flag and explicit APIs. Updated AsyncPipeReaderInput and AcBinaryDeserializer to match, with new constructor options and documentation. Expanded tests for both modes and added runtime type detection for flush strategy safety. Minor refactoring and doc improvements throughout.
This commit is contained in:
Loretta 2026-04-29 16:09:33 +02:00
parent 4ca3f51632
commit 910b0deab8
8 changed files with 446 additions and 230 deletions

View File

@ -13,7 +13,7 @@ namespace AyCode.Core.Tests.Serialization;
/// <para>The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers — /// <para>The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers —
/// the tests own the <see cref="NamedPipeServerStream"/> / <see cref="NamedPipeClientStream"/> /// the tests own the <see cref="NamedPipeServerStream"/> / <see cref="NamedPipeClientStream"/>
/// lifecycle directly and call the generic /// lifecycle directly and call the generic
/// <see cref="AcBinarySerializer.Serialize{T}(T, PipeWriter, AcBinarySerializerOptions)"/> + /// <see cref="AcBinarySerializer.SerializeChunked{T}(T, PipeWriter, AcBinarySerializerOptions)"/> +
/// <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/> primitives. This proves /// <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/> primitives. This proves
/// the streaming framework works on arbitrary <c>PipeWriter</c>/<c>PipeReader</c> sources /// the streaming framework works on arbitrary <c>PipeWriter</c>/<c>PipeReader</c> sources
/// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in /// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in
@ -29,6 +29,7 @@ public class AcBinarySerializerNamedPipeTests
public async Task RoundTrip_SmallChunkSize_PayloadEquals() public async Task RoundTrip_SmallChunkSize_PayloadEquals()
{ {
var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}";
// 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing // 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing
// for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end. // for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end.
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
@ -51,6 +52,7 @@ public class AcBinarySerializerNamedPipeTests
#if DEBUG #if DEBUG
// Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed. // Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed.
var diagLogs = new List<string>(); var diagLogs = new List<string>();
AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}"); AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}");
AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}"); AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}");
#endif #endif
@ -71,6 +73,7 @@ public class AcBinarySerializerNamedPipeTests
// Deep structure: count items + pallets + measurements + points must match exactly // Deep structure: count items + pallets + measurements + points must match exactly
var origCounts = CountTestOrderHierarchy(original); var origCounts = CountTestOrderHierarchy(original);
var resultCounts = CountTestOrderHierarchy(result); var resultCounts = CountTestOrderHierarchy(result);
Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch");
Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch");
Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch");
@ -81,15 +84,17 @@ public class AcBinarySerializerNamedPipeTests
#if DEBUG #if DEBUG
AsyncPipeReaderInput.DiagnosticLog = null; AsyncPipeReaderInput.DiagnosticLog = null;
AsyncPipeWriterOutput.DiagnosticLog = null; AsyncPipeWriterOutput.DiagnosticLog = null;
if (diagLogs.Count > 0) if (diagLogs.Count > 0)
{ {
Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ==="); Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ===");
// Print last 60 entries (most relevant to failure point) // Print last 60 entries (most relevant to failure point)
var startIdx = Math.Max(0, diagLogs.Count - 60); var startIdx = Math.Max(0, diagLogs.Count - 60);
if (startIdx > 0) if (startIdx > 0) Console.WriteLine($" ... ({startIdx} earlier entries elided)");
Console.WriteLine($" ... ({startIdx} earlier entries elided)");
for (var i = startIdx; i < diagLogs.Count; i++) for (var i = startIdx; i < diagLogs.Count; i++) Console.WriteLine($" [{i}] {diagLogs[i]}");
Console.WriteLine($" [{i}] {diagLogs[i]}");
Console.WriteLine($"=== End DiagnosticLog ==="); Console.WriteLine($"=== End DiagnosticLog ===");
} }
#endif #endif
@ -98,7 +103,7 @@ public class AcBinarySerializerNamedPipeTests
/// <summary> /// <summary>
/// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic /// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic
/// <see cref="AcBinarySerializer.Serialize{T}(T, PipeWriter, AcBinarySerializerOptions)"/> on /// <see cref="AcBinarySerializer.SerializeChunked{T}(T, PipeWriter, AcBinarySerializerOptions)"/> on
/// the client side and <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/> /// the client side and <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/>
/// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter / /// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter /
/// PipeReader. /// PipeReader.
@ -107,7 +112,7 @@ public class AcBinarySerializerNamedPipeTests
{ {
// Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with // Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with
// the OS), so the client can immediately attempt connect once we hand off to async. // the OS), so the client can immediately attempt connect once we hand off to async.
await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Message, System.IO.Pipes.PipeOptions.Asynchronous);
var receiveTask = Task.Run(async () => var receiveTask = Task.Run(async () =>
{ {
@ -123,9 +128,11 @@ public class AcBinarySerializerNamedPipeTests
var pipeWriter = PipeWriter.Create(pipeClient); var pipeWriter = PipeWriter.Create(pipeClient);
try try
{ {
// Public PipeWriter overload — auto-selects sequential flush strategy because // Public PipeWriter overload (raw chunked stream — no per-chunk frame headers,
// PipeWriter.Create(stream) returns StreamPipeWriter (race-incompatible with parallel send). // bit-compatible with Serialize(v, opts) byte[] output). Auto-selects sequential
AcBinarySerializer.Serialize(original, pipeWriter, opts); // flush strategy because PipeWriter.Create(stream) returns StreamPipeWriter
// (race-incompatible with parallel send).
AcBinarySerializer.SerializeChunked(original, pipeWriter, opts);
} }
finally finally
{ {
@ -155,7 +162,7 @@ public class AcBinarySerializerNamedPipeTests
// Note: a "default chunk size" test was deliberately omitted. The default // Note: a "default chunk size" test was deliberately omitted. The default
// AcBinarySerializerOptions.BufferWriterChunkSize used to be 65536, which exceeded the // AcBinarySerializerOptions.BufferWriterChunkSize used to be 65536, which exceeded the
// UINT16 max (65535). Fixed in this work to 65535. Tests above explicitly set chunk size // UINT16 max (256). Fixed in this work to 256. Tests above explicitly set chunk size
// for reproducibility regardless of default. // for reproducibility regardless of default.
private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount) private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount)

View File

@ -1,5 +1,6 @@
using AyCode.Core.Serializers.Binaries; using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels; using AyCode.Core.Tests.TestModels;
using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using static AyCode.Core.Tests.TestModels.AcSerializerModels; using static AyCode.Core.Tests.TestModels.AcSerializerModels;
@ -8,17 +9,18 @@ namespace AyCode.Core.Tests.Serialization;
/// <summary> /// <summary>
/// Unit tests for <see cref="AsyncPipeReaderInput"/> (Step 1, ACCORE-BIN-T-D6H4) and the /// Unit tests for <see cref="AsyncPipeReaderInput"/> (Step 1, ACCORE-BIN-T-D6H4) and the
/// <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> extension (Step 2, ACCORE-BIN-T-M2K1), /// <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> extension (Step 2, ACCORE-BIN-T-M2K1),
/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9). /// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect
/// sanity pinning (Step 4).
/// ///
/// <para>The receiver-side <see cref="AsyncPipeReaderInput.Feed"/> is framing-aware: it /// <para>Tests run with <see cref="AsyncPipeReaderInput"/>'s default <c>stripChunkFraming = true</c> —
/// expects the AsyncSegment chunked wire format <c>[201][UINT16 LE size][data]</c> per chunk, /// <see cref="AsyncPipeReaderInput.Feed"/> expects the AsyncSegment chunked wire format
/// tolerates <c>[200]</c> CHUNK_START prefix, and signals end-of-stream on <c>[202]</c> /// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c> CHUNK_START prefix, and
/// CHUNK_END. The <see cref="WrapInChunkFrame"/> helper wraps test data into single chunk /// signals end-of-stream on <c>[202]</c> CHUNK_END. The <see cref="WrapInChunkFrame"/> helper
/// frames; multi-chunk tests concatenate multiple frames.</para> /// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames.</para>
/// ///
/// <para>Wire format identical to <see cref="AsyncPipeWriterOutput"/> output and to SignalR's /// <para>Wire format identical to <see cref="AsyncPipeWriterOutput"/> framed output and to
/// <c>AcBinaryHubProtocol.TryParseChunkData</c> input — unified across all transports per /// SignalR's <c>AcBinaryHubProtocol.TryParseChunkData</c> input — unified across all transports
/// ADR-0003 §9.</para> /// per ADR-0003 §9.</para>
/// </summary> /// </summary>
[TestClass] [TestClass]
public class AcBinarySerializerPipeParallelTests public class AcBinarySerializerPipeParallelTests
@ -72,6 +74,7 @@ public class AcBinarySerializerPipeParallelTests
public void Initialize_AfterFeed_ReturnsAvailableData() public void Initialize_AfterFeed_ReturnsAvailableData()
{ {
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30 }; var data = new byte[] { 10, 20, 30 };
input.Feed(WrapInChunkFrame(data)); input.Feed(WrapInChunkFrame(data));
@ -88,7 +91,8 @@ public class AcBinarySerializerPipeParallelTests
public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse() public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse()
{ {
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 }));
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete(); input.Complete();
// Simulate consumer that has read all 3 bytes // Simulate consumer that has read all 3 bytes
@ -103,16 +107,18 @@ public class AcBinarySerializerPipeParallelTests
public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder() public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder()
{ {
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 }));
input.Feed(WrapInChunkFrame(new byte[] { 4, 5, 6 })); input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed(WrapInChunkFrame([4, 5, 6]));
input.Complete(); input.Complete();
// Simulate consumer that has read 3 of 6 bytes — advance should expose the rest // Simulate consumer that has read 3 of 6 bytes — advance should expose the rest
input.Initialize(out var buffer, out var position, out var bufferLength); input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(6, bufferLength); Assert.AreEqual(6, bufferLength);
position = 3;
position = 3;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsTrue(hasMore); Assert.IsTrue(hasMore);
Assert.AreEqual(3, position); Assert.AreEqual(3, position);
Assert.AreEqual(6, bufferLength); Assert.AreEqual(6, bufferLength);
@ -126,6 +132,7 @@ public class AcBinarySerializerPipeParallelTests
{ {
// Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow // Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow
using var input = new AsyncPipeReaderInput(16); using var input = new AsyncPipeReaderInput(16);
var data = new byte[64]; var data = new byte[64];
for (var i = 0; i < data.Length; i++) data[i] = (byte)i; for (var i = 0; i < data.Length; i++) data[i] = (byte)i;
@ -159,6 +166,7 @@ public class AcBinarySerializerPipeParallelTests
while (offset < expected.Length) while (offset < expected.Length)
{ {
var take = Math.Min(chunkSize, expected.Length - offset); var take = Math.Min(chunkSize, expected.Length - offset);
input.Feed(WrapInChunkFrame(expected, offset, take)); input.Feed(WrapInChunkFrame(expected, offset, take));
offset += take; offset += take;
} }
@ -184,6 +192,7 @@ public class AcBinarySerializerPipeParallelTests
using var input = new AsyncPipeReaderInput(32); using var input = new AsyncPipeReaderInput(32);
var expected = new byte[totalBytes]; var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF); for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input)); var consumeTask = Task.Run(() => ConsumeAll(input));
@ -210,6 +219,7 @@ public class AcBinarySerializerPipeParallelTests
await Task.WhenAll(consumeTask, produceTask); await Task.WhenAll(consumeTask, produceTask);
var actual = consumeTask.Result; var actual = consumeTask.Result;
Assert.AreEqual(expected.Length, actual.Length); Assert.AreEqual(expected.Length, actual.Length);
CollectionAssert.AreEqual(expected, actual); CollectionAssert.AreEqual(expected, actual);
} }
@ -218,7 +228,7 @@ public class AcBinarySerializerPipeParallelTests
public void Dispose_DoesNotThrow() public void Dispose_DoesNotThrow()
{ {
var input = new AsyncPipeReaderInput(64); var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete(); input.Complete();
input.Dispose(); input.Dispose();
@ -237,12 +247,12 @@ public class AcBinarySerializerPipeParallelTests
// Verifies the framing state machine survives partial frame headers / sizes / data // Verifies the framing state machine survives partial frame headers / sizes / data
// split across multiple Feed calls. // split across multiple Feed calls.
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30, 40, 50 }; var data = new byte[] { 10, 20, 30, 40, 50 };
var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50] var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50]
// Feed byte-by-byte to stress the state machine // Feed byte-by-byte to stress the state machine
for (var i = 0; i < frame.Length; i++) for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1));
input.Feed(frame.AsSpan(i, 1));
input.Complete(); input.Complete();
var consumed = ConsumeAll(input); var consumed = ConsumeAll(input);
@ -254,15 +264,17 @@ public class AcBinarySerializerPipeParallelTests
{ {
// [202] CHUNK_END alone (without external Complete()) should signal end-of-stream. // [202] CHUNK_END alone (without external Complete()) should signal end-of-stream.
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 }));
input.Feed(new byte[] { 202 }); // CHUNK_END marker only — no external Complete() input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed([202]); // CHUNK_END marker only — no external Complete()
// Should observe completion: TryAdvanceSegment returns false on empty after consume // Should observe completion: TryAdvanceSegment returns false on empty after consume
input.Initialize(out var buffer, out var position, out var bufferLength); input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(3, bufferLength); Assert.AreEqual(3, bufferLength);
position = bufferLength;
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore); Assert.IsFalse(hasMore);
} }
@ -272,7 +284,7 @@ public class AcBinarySerializerPipeParallelTests
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
// Byte 0x42 is not 200/201/202 — should throw // Byte 0x42 is not 200/201/202 — should throw
_ = Assert.ThrowsExactly<InvalidDataException>(() => input.Feed(new byte[] { 0x42 })); _ = Assert.ThrowsExactly<InvalidDataException>(() => input.Feed([0x42]));
} }
// ==================================================================== // ====================================================================
@ -285,8 +297,7 @@ public class AcBinarySerializerPipeParallelTests
var pipe = new Pipe(); var pipe = new Pipe();
await pipe.Writer.CompleteAsync(); await pipe.Writer.CompleteAsync();
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader));
await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader));
} }
[TestMethod] [TestMethod]
@ -294,8 +305,7 @@ public class AcBinarySerializerPipeParallelTests
{ {
using var input = new AsyncPipeReaderInput(64); using var input = new AsyncPipeReaderInput(64);
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await input.DrainFromAsync(null!));
await input.DrainFromAsync(null!));
} }
[TestMethod] [TestMethod]
@ -390,12 +400,12 @@ public class AcBinarySerializerPipeParallelTests
// ==================================================================== // ====================================================================
// Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9) // Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9)
// //
// True 3-task pipeline: AcBinarySerializer writes to pipe.Writer chunk-by-chunk via // True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via
// AsyncPipeWriterOutput (under the hood) — drainer pulls from pipe.Reader via // AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader
// DrainFromAsync — deserializer reads from AsyncPipeReaderInput. All three run // via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed).
// concurrently with TRUE serialize↔deserialize overlap (the serializer is still writing // All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is
// the tail of the message while the deserializer has already consumed the head, courtesy // still writing the tail of the message while the deserializer has already consumed the
// of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput). // head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput).
// //
// BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk // BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk
// boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state // boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state
@ -411,8 +421,7 @@ public class AcBinarySerializerPipeParallelTests
var pipe = new Pipe(); var pipe = new Pipe();
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestParentWithDateTimeItemCollection>(input, opts));
AcBinaryDeserializer.Deserialize<TestParentWithDateTimeItemCollection>(input, opts));
var drainTask = input.DrainFromAsync(pipe.Reader); var drainTask = input.DrainFromAsync(pipe.Reader);
@ -420,9 +429,10 @@ public class AcBinarySerializerPipeParallelTests
{ {
try try
{ {
// PipeWriter overload — writes chunked AsyncSegment framing ([201][UINT16][data]). // SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire.
// AsyncPipeReaderInput.Feed strips framing internally on the receive side. // AsyncPipeReaderInput.Feed strips framing internally on the receive side
AcBinarySerializer.Serialize(original, pipe.Writer, opts); // (default stripChunkFraming = true).
AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts);
} }
finally finally
{ {
@ -456,7 +466,7 @@ public class AcBinarySerializerPipeParallelTests
var drainTask = input.DrainFromAsync(pipe.Reader); var drainTask = input.DrainFromAsync(pipe.Reader);
var serTask = Task.Run(async () => var serTask = Task.Run(async () =>
{ {
try { AcBinarySerializer.Serialize(original, pipe.Writer, opts); } try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); }
finally { await pipe.Writer.CompleteAsync(); } finally { await pipe.Writer.CompleteAsync(); }
}); });
@ -471,6 +481,7 @@ public class AcBinarySerializerPipeParallelTests
var origCounts = CountTestOrderHierarchy(original); var origCounts = CountTestOrderHierarchy(original);
var resultCounts = CountTestOrderHierarchy(result); var resultCounts = CountTestOrderHierarchy(result);
Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch");
Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch");
Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch");
@ -479,7 +490,7 @@ public class AcBinarySerializerPipeParallelTests
private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order) private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order)
{ {
int items = order.Items.Count; var items = order.Items.Count;
int pallets = 0, measurements = 0, points = 0; int pallets = 0, measurements = 0, points = 0;
foreach (var item in order.Items) foreach (var item in order.Items)
{ {
@ -487,13 +498,84 @@ public class AcBinarySerializerPipeParallelTests
foreach (var p in item.Pallets) foreach (var p in item.Pallets)
{ {
measurements += p.Measurements.Count; measurements += p.Measurements.Count;
foreach (var m in p.Measurements) points += p.Measurements.Sum(m => m.Points.Count);
points += m.Points.Count;
} }
} }
return (items, pallets, measurements, points); return (items, pallets, measurements, points);
} }
// ====================================================================
// Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning
// ====================================================================
//
// Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a
// different runtime type than new Pipe().Writer.GetType(). This is what makes
// AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential
// (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal
// BCL type names directly. If a future .NET unifies the two writer impls or renames the
// internal type in a way that breaks the detect, these tests fail before prod.
[TestMethod]
public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes()
{
var pipeBased = new Pipe().Writer.GetType();
var streamBased = PipeWriter.Create(Stream.Null).GetType();
// Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire
// would either always-true or always-false, both of which break correctness.
Assert.AreNotEqual(pipeBased, streamBased,
$"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " +
$"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}");
// Living documentation — typenames printed for debugging on future .NET upgrades.
Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}");
Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}");
}
[TestMethod]
public void StreamPipeWriterTypeField_MatchesFactoryResult()
{
// The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType()
// at class-load time. A second call to the factory MUST yield the same Type instance —
// otherwise the cache is stale and the runtime detect mis-classifies all stream writers.
var freshType = PipeWriter.Create(Stream.Null).GetType();
Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType,
"Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " +
"behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput).");
}
[TestMethod]
public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse()
{
// The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else
// sequential mode would be wrongly selected and we'd lose the parallelism feature.
var pipeBasedType = new Pipe().Writer.GetType();
Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType),
$"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " +
$"(or subclass) — runtime detect would mis-classify it as sequential.");
}
[TestMethod]
public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue()
{
// PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) —
// even for unusual stream types (file, memory, null).
Type[] writerTypes =
[
PipeWriter.Create(Stream.Null).GetType(),
PipeWriter.Create(new MemoryStream()).GetType(),
];
foreach (var t in writerTypes)
{
Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t),
$"PipeWriter.Create(<stream>) returned typename={t.FullName} which is not " +
$"assignable to StreamPipeWriterType — the BCL changed its factory contract.");
}
}
// ==================================================================== // ====================================================================
// Test helpers // Test helpers
// ==================================================================== // ====================================================================
@ -502,15 +584,16 @@ public class AcBinarySerializerPipeParallelTests
/// Wraps a raw payload in a single AsyncSegment chunk frame: <c>[201][UINT16 LE size][data]</c>. /// Wraps a raw payload in a single AsyncSegment chunk frame: <c>[201][UINT16 LE size][data]</c>.
/// Matches the wire format produced by <see cref="AsyncPipeWriterOutput"/> per chunk. /// Matches the wire format produced by <see cref="AsyncPipeWriterOutput"/> per chunk.
/// </summary> /// </summary>
private static byte[] WrapInChunkFrame(byte[] data) private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length);
=> WrapInChunkFrame(data, 0, data.Length);
private static byte[] WrapInChunkFrame(byte[] data, int offset, int length) private static byte[] WrapInChunkFrame(byte[] data, int offset, int length)
{ {
var result = new byte[3 + length]; var result = new byte[3 + length];
result[0] = 201; // CHUNK_DATA marker result[0] = 201; // CHUNK_DATA marker
result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte
result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte
Array.Copy(data, offset, result, 3, length); Array.Copy(data, offset, result, 3, length);
return result; return result;
} }

View File

@ -48,8 +48,7 @@ public static partial class AcBinaryDeserializer
internal static void Register(Type type, IGeneratedBinaryReader reader) => Readers[type] = reader; internal static void Register(Type type, IGeneratedBinaryReader reader) => Readers[type] = reader;
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static IGeneratedBinaryReader? TryGet(Type type) => internal static IGeneratedBinaryReader? TryGet(Type type) => Readers.GetValueOrDefault(type);
Readers.TryGetValue(type, out var reader) ? reader : null;
} }
/// <summary> /// <summary>
@ -119,17 +118,14 @@ public static partial class AcBinaryDeserializer
readers[BinaryTypeCode.ByteArray] = static (ctx, _, _) => ReadByteArray(ctx); readers[BinaryTypeCode.ByteArray] = static (ctx, _, _) => ReadByteArray(ctx);
// Register FixStr readers // Register FixStr readers
for (byte code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++) for (var code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++)
{ {
var length = BinaryTypeCode.DecodeFixStrLength(code); var length = BinaryTypeCode.DecodeFixStrLength(code);
readers[code] = CreateFixStrReader<TInput>(length); readers[code] = CreateFixStrReader<TInput>(length);
} }
// Register FixObj slot readers (0..SlotCount-1) // Register FixObj slot readers (0..SlotCount-1)
for (int slot = 0; slot < BinaryTypeCode.SlotCount; slot++) for (var slot = 0; slot < BinaryTypeCode.SlotCount; slot++) readers[slot] = CreateFixObjReader<TInput>(slot);
{
readers[slot] = CreateFixObjReader<TInput>(slot);
}
return readers; return readers;
} }
@ -140,11 +136,9 @@ public static partial class AcBinaryDeserializer
/// Creates a reader for FixStr with the given length. /// Creates a reader for FixStr with the given length.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private static TypeReader<TInput> CreateFixStrReader<TInput>(int length) private static TypeReader<TInput> CreateFixStrReader<TInput>(int length) where TInput : struct, IBinaryInputBase
where TInput : struct, IBinaryInputBase
{ {
if (length == 0) if (length == 0) return static (_, _, _) => string.Empty;
return static (_, _, _) => string.Empty;
return (ctx, _, _) => ctx.ReadStringUtf8(length); return (ctx, _, _) => ctx.ReadStringUtf8(length);
} }
@ -153,13 +147,12 @@ public static partial class AcBinaryDeserializer
/// Creates a reader for FixObj slot (0..SlotCount-1). /// Creates a reader for FixObj slot (0..SlotCount-1).
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private static TypeReader<TInput> CreateFixObjReader<TInput>(int slot) private static TypeReader<TInput> CreateFixObjReader<TInput>(int slot) where TInput : struct, IBinaryInputBase
where TInput : struct, IBinaryInputBase
{ {
return (ctx, targetType, depth) => ReadObjectFromSlot(ctx, slot, targetType, depth); return (ctx, targetType, depth) => ReadObjectFromSlot(ctx, slot, targetType, depth);
} }
private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true); //private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true);
#region Public API #region Public API
@ -184,6 +177,7 @@ public static partial class AcBinaryDeserializer
var context = DeserializationContextPool<ArrayBinaryInput>.Get(options); var context = DeserializationContextPool<ArrayBinaryInput>.Get(options);
context.InitInput(new ArrayBinaryInput(data)); context.InitInput(new ArrayBinaryInput(data));
try { return (T?)DeserializeCore(context, targetType); } try { return (T?)DeserializeCore(context, targetType); }
finally { DeserializationContextPool<ArrayBinaryInput>.Return(context); } finally { DeserializationContextPool<ArrayBinaryInput>.Return(context); }
} }
@ -193,8 +187,7 @@ public static partial class AcBinaryDeserializer
/// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static T? Deserialize<T>(byte[] data, int offset, int length) public static T? Deserialize<T>(byte[] data, int offset, int length) => Deserialize<T>(data, offset, length, AcBinarySerializerOptions.Default);
=> Deserialize<T>(data, offset, length, AcBinarySerializerOptions.Default);
/// <summary> /// <summary>
/// Deserialize binary data to object of type T from a sub-range with options. /// Deserialize binary data to object of type T from a sub-range with options.
@ -206,11 +199,11 @@ public static partial class AcBinaryDeserializer
if (length == 1 && data[offset] == BinaryTypeCode.Null) return default; if (length == 1 && data[offset] == BinaryTypeCode.Null) return default;
var targetType = typeof(T); var targetType = typeof(T);
if (AcSerializerCommon.IsExpressionType(targetType)) if (AcSerializerCommon.IsExpressionType(targetType)) return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options);
return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options);
var context = DeserializationContextPool<ArrayBinaryInput>.Get(options); var context = DeserializationContextPool<ArrayBinaryInput>.Get(options);
context.InitInput(new ArrayBinaryInput(data, offset, length)); context.InitInput(new ArrayBinaryInput(data, offset, length));
try { return (T?)DeserializeCore(context, targetType); } try { return (T?)DeserializeCore(context, targetType); }
finally { DeserializationContextPool<ArrayBinaryInput>.Return(context); } finally { DeserializationContextPool<ArrayBinaryInput>.Return(context); }
} }
@ -218,14 +211,12 @@ public static partial class AcBinaryDeserializer
/// <summary> /// <summary>
/// Deserialize binary data to specified type. /// Deserialize binary data to specified type.
/// </summary> /// </summary>
public static object? Deserialize(byte[] data, Type targetType) public static object? Deserialize(byte[] data, Type targetType) => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default);
=> Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default);
/// <summary> /// <summary>
/// Deserialize binary data to specified type with options. /// Deserialize binary data to specified type with options.
/// </summary> /// </summary>
public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) => Deserialize(data, 0, data.Length, targetType, options);
=> Deserialize(data, 0, data.Length, targetType, options);
/// <summary> /// <summary>
/// Deserialize binary data to specified type from a sub-range. /// Deserialize binary data to specified type from a sub-range.
@ -332,16 +323,21 @@ public static partial class AcBinaryDeserializer
/// <para>Transport-agnostic: works with any <c>PipeReader</c> source — NamedPipe IPC /// <para>Transport-agnostic: works with any <c>PipeReader</c> source — NamedPipe IPC
/// (<c>PipeReader.Create(namedPipeServerStream)</c>), file-stream /// (<c>PipeReader.Create(namedPipeServerStream)</c>), file-stream
/// (<c>PipeReader.Create(fileStream)</c>), TCP (<c>PipeReader.Create(networkStream)</c>), /// (<c>PipeReader.Create(fileStream)</c>), TCP (<c>PipeReader.Create(networkStream)</c>),
/// or custom <c>PipeReader</c> implementations. Strips the <c>[201][UINT16 size][data]</c> /// or custom <c>PipeReader</c> implementations. Reads <b>raw AcBinary bytes</b> verbatim from
/// chunked framing internally via <see cref="AsyncPipeReaderInput.Feed"/>.</para> /// the pipe — no wire-format unwrapping. Pair with the producer-side
/// <see cref="AcBinarySerializer.SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// (or its <see cref="System.IO.Pipelines.Pipe"/> overload), which writes the same raw byte
/// stream as <see cref="AcBinarySerializer.Serialize{T}(T, AcBinarySerializerOptions)"/>'s
/// <c>byte[]</c> output.</para>
/// ///
/// <para>Receive buffer initial capacity is derived from <c>options.BufferWriterChunkSize × 2</c> /// <para>Receive buffer initial capacity is derived from <c>options.BufferWriterChunkSize × 2</c>
/// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the /// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the
/// message's lifetime regardless of total payload size.</para> /// message's lifetime regardless of total payload size.</para>
/// ///
/// <para><b>For the producer side</b>: see /// <para><b>For the multiplexed wire format</b> (per-chunk <c>[201][UINT16][data]</c> headers,
/// <see cref="AcBinarySerializer.Serialize{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/> /// produced by <c>SerializeChunkedFramed</c> or SignalR's AsyncSegment mode): the parser
/// or <see cref="AcBinarySerializer.Serialize{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>.</para> /// strips framing on its own (e.g. <c>AcBinaryHubProtocol.TryParseChunkData</c>) and feeds
/// only the data bytes here.</para>
/// </summary> /// </summary>
/// <param name="reader">Source pipe reader. Caller owns lifecycle (creation + completion).</param> /// <param name="reader">Source pipe reader. Caller owns lifecycle (creation + completion).</param>
/// <param name="options">Serializer options. Defaults to <see cref="AcBinarySerializerOptions.Default"/>. /// <param name="options">Serializer options. Defaults to <see cref="AcBinarySerializerOptions.Default"/>.
@ -354,7 +350,11 @@ public static partial class AcBinaryDeserializer
var opts = options ?? AcBinarySerializerOptions.Default; var opts = options ?? AcBinarySerializerOptions.Default;
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); // Raw mode (stripChunkFraming: false) — bytes drained from the PipeReader are forwarded
// verbatim to the deserialization buffer. Pair with AcBinarySerializer.SerializeChunked
// (raw byte stream) on the producer side; for chunked-framed wire formats the parser
// strips framing upstream and feeds only data bytes here.
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, stripChunkFraming: false);
var deserTask = Task.Run(() => Deserialize<T>(input, opts), ct); var deserTask = Task.Run(() => Deserialize<T>(input, opts), ct);
await input.DrainFromAsync(reader, ct).ConfigureAwait(false); await input.DrainFromAsync(reader, ct).ConfigureAwait(false);
@ -1654,7 +1654,7 @@ public static partial class AcBinaryDeserializer
if (targetType.IsArray) if (targetType.IsArray)
{ {
var array = Array.CreateInstance(elementType, count); var array = Array.CreateInstance(elementType, count);
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var value = ReadValue(context, elementType, nextDepth); var value = ReadValue(context, elementType, nextDepth);
array.SetValue(value, i); array.SetValue(value, i);
@ -1681,7 +1681,7 @@ public static partial class AcBinaryDeserializer
try try
{ {
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var value = ReadValue(context, elementType, nextDepth); var value = ReadValue(context, elementType, nextDepth);
list.Add(value); list.Add(value);
@ -1706,7 +1706,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, IntType)) if (ReferenceEquals(elementType, IntType))
{ {
var array = new int[count]; var array = new int[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (BinaryTypeCode.IsTinyInt(typeCode)) if (BinaryTypeCode.IsTinyInt(typeCode))
@ -1724,7 +1724,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, DoubleType)) if (ReferenceEquals(elementType, DoubleType))
{ {
var array = new double[count]; var array = new double[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Float64) return null; if (typeCode != BinaryTypeCode.Float64) return null;
@ -1738,7 +1738,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, LongType)) if (ReferenceEquals(elementType, LongType))
{ {
var array = new long[count]; var array = new long[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (BinaryTypeCode.IsTinyInt(typeCode)) if (BinaryTypeCode.IsTinyInt(typeCode))
@ -1758,7 +1758,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, BoolType)) if (ReferenceEquals(elementType, BoolType))
{ {
var array = new bool[count]; var array = new bool[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode == BinaryTypeCode.True) array[i] = true; if (typeCode == BinaryTypeCode.True) array[i] = true;
@ -1773,7 +1773,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, GuidType)) if (ReferenceEquals(elementType, GuidType))
{ {
var array = new Guid[count]; var array = new Guid[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Guid) return null; if (typeCode != BinaryTypeCode.Guid) return null;
@ -1787,7 +1787,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, DecimalType)) if (ReferenceEquals(elementType, DecimalType))
{ {
var array = new decimal[count]; var array = new decimal[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Decimal) return null; if (typeCode != BinaryTypeCode.Decimal) return null;
@ -1801,7 +1801,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, DateTimeType)) if (ReferenceEquals(elementType, DateTimeType))
{ {
var array = new DateTime[count]; var array = new DateTime[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.DateTime) return null; if (typeCode != BinaryTypeCode.DateTime) return null;
@ -1815,7 +1815,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, FloatType)) if (ReferenceEquals(elementType, FloatType))
{ {
var array = new float[count]; var array = new float[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Float32) return null; if (typeCode != BinaryTypeCode.Float32) return null;
@ -1829,7 +1829,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, ShortType)) if (ReferenceEquals(elementType, ShortType))
{ {
var array = new short[count]; var array = new short[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Int16) return null; if (typeCode != BinaryTypeCode.Int16) return null;
@ -1843,7 +1843,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, UShortType)) if (ReferenceEquals(elementType, UShortType))
{ {
var array = new ushort[count]; var array = new ushort[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.UInt16) return null; if (typeCode != BinaryTypeCode.UInt16) return null;
@ -1857,7 +1857,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, UIntType)) if (ReferenceEquals(elementType, UIntType))
{ {
var array = new uint[count]; var array = new uint[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.UInt32) return null; if (typeCode != BinaryTypeCode.UInt32) return null;
@ -1871,7 +1871,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, ULongType)) if (ReferenceEquals(elementType, ULongType))
{ {
var array = new ulong[count]; var array = new ulong[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.UInt64) return null; if (typeCode != BinaryTypeCode.UInt64) return null;
@ -1885,7 +1885,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, SByteType)) if (ReferenceEquals(elementType, SByteType))
{ {
var array = new sbyte[count]; var array = new sbyte[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Int8) return null; if (typeCode != BinaryTypeCode.Int8) return null;
@ -1899,7 +1899,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, CharType)) if (ReferenceEquals(elementType, CharType))
{ {
var array = new char[count]; var array = new char[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.Char) return null; if (typeCode != BinaryTypeCode.Char) return null;
@ -1913,7 +1913,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, DateTimeOffsetType)) if (ReferenceEquals(elementType, DateTimeOffsetType))
{ {
var array = new DateTimeOffset[count]; var array = new DateTimeOffset[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.DateTimeOffset) return null; if (typeCode != BinaryTypeCode.DateTimeOffset) return null;
@ -1927,7 +1927,7 @@ public static partial class AcBinaryDeserializer
if (ReferenceEquals(elementType, TimeSpanType)) if (ReferenceEquals(elementType, TimeSpanType))
{ {
var array = new TimeSpan[count]; var array = new TimeSpan[count];
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var typeCode = context.ReadByte(); var typeCode = context.ReadByte();
if (typeCode != BinaryTypeCode.TimeSpan) return null; if (typeCode != BinaryTypeCode.TimeSpan) return null;
@ -1964,7 +1964,7 @@ public static partial class AcBinaryDeserializer
var dict = (IDictionary)Activator.CreateInstance(dictType, count)!; var dict = (IDictionary)Activator.CreateInstance(dictType, count)!;
var nextDepth = depth + 1; var nextDepth = depth + 1;
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
var key = ReadValue(context, keyType, nextDepth); var key = ReadValue(context, keyType, nextDepth);
var value = ReadValue(context, valueType, nextDepth); var value = ReadValue(context, valueType, nextDepth);
@ -2196,7 +2196,7 @@ public static partial class AcBinaryDeserializer
where TInput : struct, IBinaryInputBase where TInput : struct, IBinaryInputBase
{ {
var count = (int)context.ReadVarUInt(); var count = (int)context.ReadVarUInt();
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
SkipValue(context, metaData); SkipValue(context, metaData);
} }
@ -2206,7 +2206,7 @@ public static partial class AcBinaryDeserializer
where TInput : struct, IBinaryInputBase where TInput : struct, IBinaryInputBase
{ {
var count = (int)context.ReadVarUInt(); var count = (int)context.ReadVarUInt();
for (int i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
SkipValue(context, metaData); // key SkipValue(context, metaData); // key
SkipValue(context, metaData); // value SkipValue(context, metaData); // value

View File

@ -423,21 +423,17 @@ public static partial class AcBinarySerializer
} }
/// <summary> /// <summary>
/// Serialize to a <see cref="System.IO.Pipelines.Pipe"/> with chunked protocol framing via /// Serialize to a <see cref="System.IO.Pipelines.Pipe"/> as a chunked stream — pure AcBinary
/// <see cref="AsyncPipeWriterOutput"/> — gives the caller full <paramref name="waitForFlush"/> /// bytes are written via <see cref="AsyncPipeWriterOutput"/> in raw mode (no per-chunk header).
/// + <paramref name="flushTimeout"/> control because <see cref="System.IO.Pipelines.Pipe.Writer"/> /// The output is byte-compatible with <see cref="Serialize{T}(T, AcBinarySerializerOptions)"/>'s
/// is always the BCL <c>PipeWriterImpl</c>, which is parallel-capable (no <c>_tailMemory</c> /// <c>byte[]</c> result; a consumer can drain <c>pipe.Reader</c> and feed the bytes directly to
/// reset race like <c>StreamPipeWriter</c>). /// <see cref="AcBinaryDeserializer"/> (or pipe-them through <c>DeserializeFromPipeReaderAsync</c>)
/// with no extra parser.
/// ///
/// <para>Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> and /// <para><b>Why <see cref="System.IO.Pipelines.Pipe"/> instead of <see cref="System.IO.Pipelines.PipeWriter"/>?</b>
/// committed to <c>pipe.Writer</c> via <c>Advance</c> (zero-copy). A consumer drains /// <c>Pipe.Writer</c> is always the BCL <c>PipeWriterImpl</c>, which is parallel-capable
/// <c>pipe.Reader</c> on a background task and writes to the actual transport.</para> /// (no <c>_tailMemory</c> reset race like <c>StreamPipeWriter</c>). This overload exposes the
/// /// <paramref name="waitForFlush"/> + <paramref name="flushTimeout"/> tuning safely.</para>
/// <para><b>Use this overload when</b> you constructed <c>new Pipe()</c> yourself and need
/// runtime tuning of the flush strategy. For arbitrary <see cref="System.IO.Pipelines.PipeWriter"/>
/// (Kestrel transport output, <c>PipeWriter.Create(stream)</c>, custom writers), use the
/// <see cref="Serialize{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
/// overload.</para>
/// </summary> /// </summary>
/// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param> /// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param>
/// <param name="pipe">Target pipe — caller drains <c>pipe.Reader</c> elsewhere.</param> /// <param name="pipe">Target pipe — caller drains <c>pipe.Reader</c> elsewhere.</param>
@ -452,49 +448,108 @@ public static partial class AcBinarySerializer
/// Per-flush timeout. <c>null</c> → wait forever. Positive value: throws /// Per-flush timeout. <c>null</c> → wait forever. Positive value: throws
/// <see cref="TimeoutException"/> on stuck consumers. /// <see cref="TimeoutException"/> on stuck consumers.
/// </param> /// </param>
/// <returns>Total serialized data bytes (excluding framing overhead).</returns> /// <returns>Total serialized bytes written.</returns>
public static int Serialize<T>(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) public static int SerializeChunked<T>(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{ {
if (pipe is null) throw new ArgumentNullException(nameof(pipe)); if (pipe is null) throw new ArgumentNullException(nameof(pipe));
return Serialize(value, pipe.Writer, options, waitForFlush, flushTimeout); return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: false);
} }
/// <summary> /// <summary>
/// Serialize to any <see cref="System.IO.Pipelines.PipeWriter"/> with chunked protocol framing /// Serialize to any <see cref="System.IO.Pipelines.PipeWriter"/> as a chunked stream — pure
/// via <see cref="AsyncPipeWriterOutput"/>. Each chunk (including the last) is framed as /// AcBinary bytes, no per-chunk header. The output is byte-compatible with
/// <c>[201][UINT16 size][data]</c> and committed to the PipeWriter via Advance (zero-copy). /// <see cref="Serialize{T}(T, AcBinarySerializerOptions)"/>'s <c>byte[]</c> result.
/// ///
/// <para><b>Flush strategy is auto-selected by writer type:</b> /// <para><b>Flush strategy auto-selected by writer type</b>: <c>StreamPipeWriter</c>
/// <c>StreamPipeWriter</c> (from <c>PipeWriter.Create(stream)</c> — NamedPipe, FileStream, /// (<c>PipeWriter.Create(stream)</c> — NamedPipe / FileStream / NetworkStream / etc.) runs
/// NetworkStream, etc.) runs sequentially per chunk because the BCL impl resets /// sequentially per chunk because the BCL impl resets <c>_tailMemory</c> on flush completion
/// <c>_tailMemory</c> on flush completion (race-incompatible with parallel send). All other /// (race-incompatible with parallel send). Other PipeWriter implementations (Kestrel transport,
/// PipeWriter implementations (Kestrel transport, custom impls) run with the safe /// custom impls) run with the safe <c>waitForFlush=true</c> default — max parallelism, zero-alloc.</para>
/// <c>waitForFlush=true</c> default — max parallelism, zero-alloc.</para>
/// ///
/// <para><b>Need runtime tuning of the flush strategy?</b> If you control the pipe yourself, /// <para><b>Need runtime tuning of the flush strategy?</b> Build a
/// build a <see cref="System.IO.Pipelines.Pipe"/> instance and use the /// <see cref="System.IO.Pipelines.Pipe"/> instance and use
/// <see cref="Serialize{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/> /// <see cref="SerializeChunked{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>
/// overload — only Pipe-based writers can guarantee parallel-capable flush behavior.</para> /// — only Pipe-based writers can guarantee parallel-capable flush behavior.</para>
///
/// <para><b>Need a multiplexed wire format with per-chunk frame headers?</b> See
/// <see cref="SerializeChunkedFramed{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>.</para>
/// </summary> /// </summary>
/// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param> /// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param>
/// <param name="pipeWriter">Target pipe writer.</param> /// <param name="pipeWriter">Target pipe writer.</param>
/// <param name="options">Serializer options (type wrappers, reference handling, interning, etc.).</param> /// <param name="options">Serializer options (type wrappers, reference handling, interning, etc.).</param>
/// <returns>Total serialized data bytes (excluding framing overhead).</returns> /// <returns>Total serialized bytes written.</returns>
public static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) public static int SerializeChunked<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options)
=> Serialize(value, pipeWriter, options, waitForFlush: true, flushTimeout: null); => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: false);
/// <summary> /// <summary>
/// Internal flush-tunable PipeWriter overload — only callable from <c>AyCode.Services</c> /// Serialize a value into a chunked stream where each chunk carries a self-describing
/// (SignalR hub protocol) because external callers cannot safely choose <paramref name="waitForFlush"/> /// frame header — <c>[201][UINT16 size][data]</c> per chunk, with a final <c>[202]</c>
/// without knowing the concrete <see cref="System.IO.Pipelines.PipeWriter"/> implementation. /// end-of-stream marker. The frame headers let the receiver detect chunk boundaries
/// SignalR uses Kestrel transport output, which is parallel-capable, and forwards the /// incrementally without knowing the total payload size up front, and let multiple
/// hub-protocol-options-configured tuning here. /// independent messages share a single transport with reliable separation.
/// ///
/// <para>For the public API, see the <see cref="System.IO.Pipelines.Pipe"/> overload (parallel-capable, /// <para><b>Use this when</b> building a multiplexed wire protocol where several logical
/// tuning paramters available) or the simple <see cref="System.IO.Pipelines.PipeWriter"/> overload /// messages are interleaved on one stream, when the receiver needs to start deserializing
/// (auto-selects strategy, no tuning).</para> /// as bytes arrive (pipeline parallelism — serialize / network / deserialize overlap), or
/// when the upper layer needs to dispatch each chunk independently. Typical scenarios:
/// real-time RPC, custom Hub-style protocols, event stream multiplexing.</para>
///
/// <para><b>Concrete example</b>: SignalR's <c>BinaryProtocolMode.AsyncSegment</c> uses
/// this exact wire format to interleave many HubMessages over a single connection.</para>
///
/// <para><b>Need a simpler streaming output without per-chunk metadata?</b> Use
/// <see cref="SerializeChunked{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>
/// — bit-compatible with <see cref="Serialize{T}(T, AcBinarySerializerOptions)"/>'s
/// <c>byte[]</c> output, no extra parser needed on the receive side.</para>
/// </summary>
/// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param>
/// <param name="pipe">Target pipe — caller drains <c>pipe.Reader</c> elsewhere.</param>
/// <param name="options">Serializer options.</param>
/// <param name="waitForFlush">See <see cref="SerializeChunked{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>.</param>
/// <param name="flushTimeout">See <see cref="SerializeChunked{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>.</param>
/// <returns>Total serialized data bytes (excluding framing overhead).</returns>
public static int SerializeChunkedFramed<T>(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (pipe is null) throw new ArgumentNullException(nameof(pipe));
return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: true);
}
/// <summary>
/// Serialize to any <see cref="System.IO.Pipelines.PipeWriter"/> with per-chunk frame headers
/// (multiplexed wire format). See
/// <see cref="SerializeChunkedFramed{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>
/// for the wire format details and use-cases.
///
/// <para><b>Flush strategy auto-selected by writer type</b> — see
/// <see cref="SerializeChunked{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>.</para>
/// </summary>
public static int SerializeChunkedFramed<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options)
=> SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: true);
/// <summary>
/// Internal flush-tunable framed PipeWriter overload — used by <c>AyCode.Services</c>
/// (SignalR hub protocol) on Kestrel transport output, which is parallel-capable. External
/// callers should use the <see cref="System.IO.Pipelines.Pipe"/> overload to safely tune
/// <paramref name="waitForFlush"/> on a guaranteed parallel-capable writer.
/// </summary>
internal static int SerializeChunkedFramed<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout)
=> SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true);
/// <summary>
/// Internal legacy alias for <see cref="SerializeChunkedFramed{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions, bool, TimeSpan?)"/>
/// — kept until the SignalR hub protocol (<c>AcBinaryHubProtocol.cs</c>) is migrated to the
/// new name in a separate, isolated step. Identical behavior to <c>SerializeChunkedFramed</c>
/// (framed wire format with <c>[201][UINT16][data]</c> per chunk + <c>[202]</c> end marker).
/// </summary> /// </summary>
internal static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) internal static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout)
=> SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true);
/// <summary>
/// Common pipe-output serialization core. Same loop for both raw (<see cref="SerializeChunked{T}"/>)
/// and framed (<see cref="SerializeChunkedFramed{T}"/>) modes — the only difference flows through
/// <paramref name="emitChunkFraming"/> into the <see cref="AsyncPipeWriterOutput"/> ctor.
/// </summary>
private static int SerializeToPipeWriterCore<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool emitChunkFraming)
{ {
if (value == null) if (value == null)
{ {
@ -507,7 +562,8 @@ public static partial class AcBinarySerializer
var runtimeType = value.GetType(); var runtimeType = value.GetType();
var context = BinarySerializationContextPool<AsyncPipeWriterOutput>.Get(options); var context = BinarySerializationContextPool<AsyncPipeWriterOutput>.Get(options);
context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush, flushTimeout);
context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, emitChunkFraming, waitForFlush, flushTimeout);
context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd);
try try

View File

@ -1,9 +1,6 @@
using System;
using System.Buffers; using System.Buffers;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Threading;
namespace AyCode.Core.Serializers.Binaries; namespace AyCode.Core.Serializers.Binaries;
@ -18,13 +15,19 @@ namespace AyCode.Core.Serializers.Binaries;
/// .NET BCL convention for type-level <c>Async</c> prefix (<c>AsyncEnumerable</c>, /// .NET BCL convention for type-level <c>Async</c> prefix (<c>AsyncEnumerable</c>,
/// <c>IAsyncDisposable</c>, <c>AsyncLocal&lt;T&gt;</c>, ...). /// <c>IAsyncDisposable</c>, <c>AsyncLocal&lt;T&gt;</c>, ...).
/// ///
/// <para><see cref="Feed"/> behavior is driven by the <c>stripChunkFraming</c> ctor flag:
/// <c>true</c> (default) — parses <c>[201][UINT16][data]</c> chunked frames + <c>[202]</c> end
/// marker (matches <c>AsyncPipeWriterOutput</c> framed output and SignalR's AsyncSegment wire
/// format); <c>false</c> — appends bytes verbatim (matches <c>AcBinarySerializer.SerializeChunked</c>
/// raw output drained from a <see cref="System.IO.Pipelines.PipeReader"/>).</para>
///
/// <para>Usage modes:</para> /// <para>Usage modes:</para>
/// <list type="bullet"> /// <list type="bullet">
/// <item><b>Push (Feed-API)</b>: producer thread calls <see cref="Feed"/> with chunk bytes /// <item><b>Push (Feed-API)</b>: producer thread calls <see cref="Feed"/> with chunk bytes
/// (typical for SignalR <c>TryParseChunkData</c>).</item> /// (typical for SignalR <c>TryParseChunkData</c>).</item>
/// <item><b>Pull (DrainFromAsync extension)</b>: helper drains a /// <item><b>Pull (DrainFromAsync extension)</b>: helper drains a
/// <see cref="System.IO.Pipelines.PipeReader"/> into the input via repeated /// <see cref="System.IO.Pipelines.PipeReader"/> into the input via repeated
/// <see cref="Feed"/> calls (typical for NamedPipe / FileStream).</item> /// <see cref="Feed"/> calls (typical for NamedPipe / FileStream / NetworkStream).</item>
/// </list> /// </list>
/// ///
/// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>. Positions reset /// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>. Positions reset
@ -57,16 +60,21 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
private int _readPos; // consumer reports consumed position here private int _readPos; // consumer reports consumed position here
private bool _completed; private bool _completed;
// Whether Feed() should strip [201][UINT16][data] chunked framing (true, default — matches
// SignalR-style multiplexed wire) or append bytes verbatim (false — matches the raw output
// of SerializeChunked / single-shot byte[] over PipeWriter).
private readonly bool _stripChunkFraming;
// Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END. // Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END.
// [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's output; // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's framed
// identical parsing logic to AcBinaryHubProtocol.TryParseChunkData but stream-stateful // output and SignalR's AsyncSegment chunked frame format. Only active when
// across Feed(span) boundaries (the SignalR side has ReadOnlySequence<byte> with rewind; // _stripChunkFraming = true.
// we get arbitrary spans).
private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped
private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data] private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data]
private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-stream private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-stream
private FramingState _framingState = FramingState.AwaitingHeader; private FramingState _framingState = FramingState.AwaitingHeader;
private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High
private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame
@ -108,36 +116,54 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
/// 4 KB chunk size, 128 KB for the standalone 64 KB default). /// 4 KB chunk size, 128 KB for the standalone 64 KB default).
/// </summary> /// </summary>
/// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param> /// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param>
/// <param name="logger">Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds.</param> /// <param name="stripChunkFraming">
public AsyncPipeReaderInput(int initialCapacity) /// <c>true</c> (default): <see cref="Feed"/> parses <c>[201][UINT16][data]</c> chunked frames +
/// <c>[202]</c> end marker (matches <see cref="AsyncPipeWriterOutput"/> framed output and
/// SignalR's AsyncSegment chunked wire format).
/// <c>false</c>: <see cref="Feed"/> appends bytes verbatim — for raw byte streams (matches
/// <c>AcBinarySerializer.SerializeChunked</c> output and the single-shot
/// <c>byte[]</c> output).
/// </param>
public AsyncPipeReaderInput(int initialCapacity, bool stripChunkFraming = true)
{ {
if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity)); if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity));
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity); _buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_stripChunkFraming = stripChunkFraming;
_dataAvailable = new ManualResetEventSlim(false); _dataAvailable = new ManualResetEventSlim(false);
} }
// --- Producer API (push) --- // --- Producer API (push) ---
/// <summary> /// <summary>
/// Feeds raw chunked-wire bytes (any combination of complete or partial /// Feeds bytes into the consumer-visible buffer. Behavior is driven by the
/// <c>[201][UINT16 LE size][data]</c> frames, optional <c>[200]</c> CHUNK_START prefix, /// <c>stripChunkFraming</c> ctor flag:
/// trailing <c>[202]</c> CHUNK_END). Strips framing internally; only the unwrapped /// <list type="bullet">
/// <c>data</c> bytes land in the consumer-visible buffer. /// <item><b>stripChunkFraming = true</b> (default): expects the chunked wire format
/// /// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c>
/// <para>State is preserved across <c>Feed</c> calls — partial frame headers, mid-size /// CHUNK_START prefix, and signals end-of-stream on <c>[202]</c> CHUNK_END. State
/// boundaries, and mid-data boundaries all resume correctly on the next call.</para> /// is preserved across <c>Feed</c> calls — partial frame headers, mid-size boundaries,
/// /// and mid-data boundaries all resume correctly. On <c>[202]</c>, sets the completion
/// <para>Wire format identical to <see cref="AsyncPipeWriterOutput"/> output and to /// flag and signals waiting consumers — equivalent to an external <see cref="Complete"/>
/// SignalR's AsyncSegment chunked frame format. Unified across all transports per ADR-0003.</para> /// call. Bytes after <c>[202]</c> are ignored.</item>
/// /// <item><b>stripChunkFraming = false</b>: appends bytes verbatim — no wire-format
/// <para>On <c>[202]</c>, sets the completion flag and signals waiting consumers — equivalent /// interpretation. The producer must pass only payload bytes (e.g. raw byte stream
/// to an external <see cref="Complete"/> call. Bytes after <c>[202]</c> are ignored.</para> /// drained from a <see cref="System.IO.Pipelines.PipeReader"/> paired with
/// <c>AcBinarySerializer.SerializeChunked</c>).</item>
/// </list>
/// </summary> /// </summary>
public void Feed(ReadOnlySpan<byte> data) public void Feed(ReadOnlySpan<byte> data)
{ {
if (data.IsEmpty) return; if (data.IsEmpty) return;
if (!_stripChunkFraming)
{
// Raw mode: append verbatim, no framing interpretation.
AppendToBuffer(data);
return;
}
// Framed mode: state machine parses [201][UINT16 LE size][data] frames + [202] end marker.
var i = 0; var i = 0;
while (i < data.Length) while (i < data.Length)
{ {
@ -214,7 +240,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
} }
/// <summary> /// <summary>
/// Appends unwrapped data bytes to the internal buffer with sliding-window cycling /// Appends data bytes to the internal buffer with sliding-window cycling
/// (reset to 0 when consumer has caught up) and grow-as-last-resort. Signals the consumer. /// (reset to 0 when consumer has caught up) and grow-as-last-resort. Signals the consumer.
/// </summary> /// </summary>
private void AppendToBuffer(ReadOnlySpan<byte> data) private void AppendToBuffer(ReadOnlySpan<byte> data)

View File

@ -1,26 +1,34 @@
using System;
using System.Buffers; using System.Buffers;
using System.Buffers.Binary; using System.Buffers.Binary;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace AyCode.Core.Serializers.Binaries; namespace AyCode.Core.Serializers.Binaries;
/// <summary> /// <summary>
/// Binary output that writes to a PipeWriter with per-chunk self-describing framing. /// Binary output that writes to a PipeWriter chunk-by-chunk via the PipeWriter's natural slabbing.
/// Two wire-format modes are supported, selected by the <c>emitChunkFraming</c> ctor flag:
/// ///
/// Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> with a 3-byte header /// <list type="bullet">
/// reserved at the start of each buffer. The serializer context writes into the space after the /// <item><b><c>emitChunkFraming = false</c></b> (raw): pure AcBinary bytes are written into
/// reserved bytes; on <see cref="Grow"/>, the header is patched and the full chunk is committed via /// the PipeWriter's slabs and committed via Advance — no per-chunk header bytes appear on the
/// Advance (zero-copy). <see cref="Flush"/> does the same for the last (partial) chunk. /// wire. Bit-compatible with the single-shot <c>Serialize(value, opts) → byte[]</c> output.
/// The receiver can deserialize the byte stream as-is (e.g. via
/// <c>AcBinaryDeserializer.Deserialize(byte[])</c> after collecting, or any raw
/// <c>PipeReader</c>-based path).</item>
/// ///
/// The protocol layer writes a single <c>[202]</c> byte after all chunks to signal end-of-stream. /// <item><b><c>emitChunkFraming = true</c></b> (framed): each chunk gets a 3-byte header
/// <c>[201][UINT16 size][data]</c>. The header is reserved at the start of each acquired slab;
/// the serializer writes data after it, and on commit the size is patched and the full chunk
/// is Advanced (zero-copy). The protocol layer writes a single <c>[202]</c> byte after all
/// chunks to signal end-of-stream. This is the multiplexed wire format used by SignalR's
/// <c>BinaryProtocolMode.AsyncSegment</c> and any custom multiplexed protocol where the
/// receiver needs incremental chunk-boundary detection.</item>
/// </list>
/// ///
/// <para><b>Backpressure modes</b> (controlled by <c>waitForFlush</c>):</para> /// <para><b>Backpressure modes</b> (controlled by <c>waitForFlush</c>) — independent of framing:</para>
/// <list type="bullet"> /// <list type="bullet">
/// <item><c>waitForFlush=true</c> (default): Grow() waits for the previous FlushAsync before /// <item><c>waitForFlush=true</c> (default): Grow() waits for the previous FlushAsync before
/// starting a new chunk. <b>Pro:</b> maximum pipeline parallelism, guaranteed end-to-end zero-copy. /// starting a new chunk. <b>Pro:</b> maximum pipeline parallelism, guaranteed end-to-end zero-copy.
@ -32,12 +40,18 @@ namespace AyCode.Core.Serializers.Binaries;
/// for that chunk.</item> /// for that chunk.</item>
/// </list> /// </list>
/// ///
/// <para><b>Flush strategy</b> auto-selects on writer type — Stream-backed PipeWriters
/// (<c>PipeWriter.Create(Stream)</c>) run sequentially per chunk because of the
/// <c>StreamPipeWriter._tailMemory</c> reset race; Pipe-based and Kestrel transport writers
/// keep parallelism. Orthogonal to the framing flag: all four (framed/raw) × (sequential/parallel)
/// combinations work.</para>
///
/// <para><b>Timeout safety</b>: every synchronous flush-await is bounded by <c>flushTimeout</c> /// <para><b>Timeout safety</b>: every synchronous flush-await is bounded by <c>flushTimeout</c>
/// (default <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> when the type is used directly; /// (default <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> when the type is used directly;
/// <see cref="AcBinaryHubProtocol"/> passes 10 s from its options). A <see cref="TimeoutException"/> /// <see cref="AcBinaryHubProtocol"/> passes 10 s from its options). A <see cref="TimeoutException"/>
/// propagates to the caller, allowing the connection to abort instead of blocking forever.</para> /// propagates to the caller, allowing the connection to abort instead of blocking forever.</para>
/// ///
/// Maximum chunk data size: 65535 bytes (UINT16 max). /// Maximum chunk data size (in framed mode): 65535 bytes (UINT16 max).
/// </summary> /// </summary>
public struct AsyncPipeWriterOutput : IBinaryOutputBase public struct AsyncPipeWriterOutput : IBinaryOutputBase
{ {
@ -51,16 +65,21 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public const int MaxChunkSize = ushort.MaxValue; public const int MaxChunkSize = ushort.MaxValue;
/// <summary> /// <summary>
/// Cached <see cref="StreamPipeWriter"/> runtime type, discovered via the public /// Cached <see cref="StreamPipeWriterType"/> runtime type, discovered via the public
/// <see cref="PipeWriter.Create(Stream, StreamPipeWriterOptions)"/> factory at class-load /// <see cref="PipeWriter.Create(Stream, StreamPipeWriterOptions)"/> factory at class-load
/// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the /// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the
/// internal type, this auto-tracks). The dummy instance is unreachable after class init /// internal type, this auto-tracks). The dummy instance is unreachable after class init
/// and GC-collected; the static field retains only the <see cref="Type"/> reference. /// and GC-collected; the static field retains only the <see cref="Type"/> reference.
///
/// <para><b>internal</b> visibility — exposed for sanity-check tests in
/// <c>AyCode.Core.Tests</c> (verifies <c>new Pipe().Writer.GetType() != StreamPipeWriterType</c>
/// and that the runtime detect can never accidentally fire on Pipe-based writers).</para>
/// </summary> /// </summary>
private static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); internal static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType();
private readonly PipeWriter _pipeWriter; private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize; private readonly int _chunkSize;
private readonly bool _emitChunkFraming;
private readonly bool _waitForFlush; private readonly bool _waitForFlush;
private readonly bool _serializeFlushAndAcquire; private readonly bool _serializeFlushAndAcquire;
private readonly TimeSpan _flushTimeout; private readonly TimeSpan _flushTimeout;
@ -80,31 +99,39 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
[Conditional("DEBUG")] [Conditional("DEBUG")]
private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message); private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message);
/// <summary>Creates an output bound to the given PipeWriter with self-describing chunked framing.</summary> /// <summary>Creates an output bound to the given PipeWriter — framed or raw mode per <paramref name="emitChunkFraming"/>.</summary>
/// <param name="pipeWriter">Target pipe (typically Kestrel's transport output for SignalR).</param> /// <param name="pipeWriter">Target pipe (typically Kestrel's transport output for SignalR, NamedPipe, FileStream, or any custom <see cref="PipeWriter"/>).</param>
/// <param name="chunkSize">Per-chunk data size (max <see cref="MaxChunkSize"/>). Default 4 KB matches Kestrel's slab size.</param> /// <param name="chunkSize">Per-chunk data size (max <see cref="MaxChunkSize"/>). Default 4 KB matches Kestrel's slab size.</param>
/// <param name="emitChunkFraming"><c>true</c> → write <c>[201][UINT16][data]</c> per-chunk header (multiplexed wire format).
/// <c>false</c> → raw AcBinary bytes only, byte-compatible with the single-shot <c>byte[]</c> output. See class summary.</param>
/// <param name="waitForFlush">See class summary — pipeline parallelism (true) vs adaptive (false).</param> /// <param name="waitForFlush">See class summary — pipeline parallelism (true) vs adaptive (false).</param>
/// <param name="flushTimeout">Per-flush timeout. <c>null</c> → <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> /// <param name="flushTimeout">Per-flush timeout. <c>null</c> → <see cref="System.Threading.Timeout.InfiniteTimeSpan"/>
/// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers.</param> /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers.</param>
public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true, TimeSpan? flushTimeout = null) public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool emitChunkFraming = true, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{ {
if (chunkSize > MaxChunkSize) if (chunkSize > MaxChunkSize)
throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max).");
$"Chunk size cannot exceed {MaxChunkSize} (UINT16 max).");
_pipeWriter = pipeWriter; _pipeWriter = pipeWriter;
_chunkSize = chunkSize; _chunkSize = chunkSize;
_emitChunkFraming = emitChunkFraming;
_waitForFlush = waitForFlush; _waitForFlush = waitForFlush;
// null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms).
// A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller.
_flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan; _flushTimeout = flushTimeout ?? Timeout.InfiniteTimeSpan;
// StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default // StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default
// at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this // at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this
// class deliberately uses. For Stream-backed writers, fully await the just-started flush // class deliberately uses. For Stream-backed writers, fully await the just-started flush
// before acquiring the next chunk's memory (the writer-correct usage pattern; flush is // before acquiring the next chunk's memory (the writer-correct usage pattern; flush is
// a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT // a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT
// reset state on flush completion → the parallelism feature stays intact for them. // reset state on flush completion → the parallelism feature stays intact for them.
_serializeFlushAndAcquire = pipeWriter.GetType() == StreamPipeWriterType; //
// IsAssignableFrom (not ==) so any future BCL subclass of StreamPipeWriter automatically
// picks the safe sequential path — forward-compat, no silent breakage on .NET upgrades.
_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter);
_committedBytes = 0; _committedBytes = 0;
_ownedBuffer = false; _ownedBuffer = false;
_lastFlush = default; _lastFlush = default;
@ -125,29 +152,31 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
var task = vt.AsTask(); var task = vt.AsTask();
if (!task.Wait(_flushTimeout)) if (!task.Wait(_flushTimeout))
throw new TimeoutException( throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected.");
$"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " +
"consumer may be too slow, stuck, or disconnected.");
// Completed within timeout — propagate any faulted exception // Completed within timeout — propagate any faulted exception
task.GetAwaiter().GetResult(); task.GetAwaiter().GetResult();
} }
/// <summary> /// <summary>
/// Provides the initial buffer from the PipeWriter with 3-byte header reservation. /// Provides the initial buffer from the PipeWriter — in framed mode, the buffer's first 3
/// bytes are reserved for the chunk header (filled at <see cref="CommitCurrentChunk"/>);
/// in raw mode, no reservation, the data starts at <c>position = 0</c>.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferEnd) public void Initialize(out byte[] buffer, out int position, out int bufferEnd)
{ {
_committedBytes = 0; _committedBytes = 0;
_lastFlush = default; _lastFlush = default;
AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd);
_currentChunkStart = position; _currentChunkStart = position;
} }
/// <summary> /// <summary>
/// Called when the context's buffer is full. Patches the chunk header [201][UINT16 size], /// Called when the context's buffer is full. Commits the chunk to the PipeWriter (in framed
/// commits the chunk to the PipeWriter, and fires a background flush. /// mode, patches the <c>[201][UINT16 size]</c> header before Advance; in raw mode, simply
/// Advances the data bytes), then fires a background flush and acquires the next chunk.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
@ -174,13 +203,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
// growth when the consumer is slow. // growth when the consumer is slow.
// The conditional FlushAsync at the end avoids double-flush if the previous flush // The conditional FlushAsync at the end avoids double-flush if the previous flush
// is still in progress (waitForFlush=false skip path). // is still in progress (waitForFlush=false skip path).
if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush);
SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position); CommitCurrentChunk(buffer, position);
if (_lastFlush.IsCompleted) if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync();
_lastFlush = _pipeWriter.FlushAsync();
} }
// Acquire new chunk with header reservation (common to both paths). // Acquire new chunk with header reservation (common to both paths).
@ -195,9 +222,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart); public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart);
/// <summary> /// <summary>
/// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header. /// Commits the last (partial) chunk to the PipeWriter — in framed mode patches the
/// Zero-copy: patches the reserved header bytes and calls Advance — no data copying. /// <c>[201][UINT16 size]</c> header before Advance, in raw mode simply Advances the data.
/// Does NOT flush to network — the protocol writes [202] and flushes after. /// Zero-copy: no data copying. Does NOT flush to network — in framed mode the protocol writes
/// <c>[202]</c> and flushes after.
/// </summary> /// </summary>
public void Flush(byte[] buffer, int position) public void Flush(byte[] buffer, int position)
{ {
@ -213,7 +241,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public void Reset() { } public void Reset() { }
/// <summary> /// <summary>
/// Patches [201][UINT16 dataBytes] into the reserved header and commits via Advance. /// Commits the current chunk to the PipeWriter. In framed mode, patches the reserved
/// <c>[201][UINT16 dataBytes]</c> header before Advance; in raw mode, simply Advances the data.
/// For owned buffers, copies to PipeWriter first. /// For owned buffers, copies to PipeWriter first.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
@ -222,16 +251,25 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
var dataBytes = position - _currentChunkStart; var dataBytes = position - _currentChunkStart;
if (dataBytes <= 0) return; if (dataBytes <= 0) return;
if (_emitChunkFraming)
{
var headerStart = _currentChunkStart - HeaderSize; var headerStart = _currentChunkStart - HeaderSize;
buffer[headerStart] = ChunkDataMarker; buffer[headerStart] = ChunkDataMarker;
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes);
EmitDiagnostic($"CommitCurrentChunk: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})");
if (_ownedBuffer) if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes);
FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); else _pipeWriter.Advance(HeaderSize + dataBytes);
}
else else
_pipeWriter.Advance(HeaderSize + dataBytes); {
EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({dataBytes})");
if (_ownedBuffer) FlushOwnedBuffer(buffer, _currentChunkStart, dataBytes);
else _pipeWriter.Advance(dataBytes);
}
_committedBytes += dataBytes; // only count data bytes, not framing _committedBytes += dataBytes; // only count data bytes, not framing
} }
@ -240,8 +278,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
private void FlushOwnedBuffer(byte[] buffer, int start, int length) private void FlushOwnedBuffer(byte[] buffer, int start, int length)
{ {
var span = _pipeWriter.GetSpan(length); var span = _pipeWriter.GetSpan(length);
buffer.AsSpan(start, length).CopyTo(span); buffer.AsSpan(start, length).CopyTo(span);
_pipeWriter.Advance(length); _pipeWriter.Advance(length);
ArrayPool<byte>.Shared.Return(buffer); ArrayPool<byte>.Shared.Return(buffer);
_ownedBuffer = false; _ownedBuffer = false;
} }
@ -249,16 +289,20 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
{ {
var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize); var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize);
var totalRequest = dataSize + HeaderSize;
// Header reservation only in framed mode — raw mode skips it for byte-compat with the
// single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers).
var headerOffset = _emitChunkFraming ? HeaderSize : 0;
var totalRequest = dataSize + headerOffset;
var memory = _pipeWriter.GetMemory(totalRequest); var memory = _pipeWriter.GetMemory(totalRequest);
EmitDiagnostic($"AcquireChunk: requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); EmitDiagnostic($"AcquireChunk: framed={_emitChunkFraming} requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}");
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null) if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null)
{ {
buffer = segment.Array; buffer = segment.Array;
position = segment.Offset + HeaderSize; position = segment.Offset + headerOffset;
bufferEnd = segment.Offset + HeaderSize + dataSize; bufferEnd = segment.Offset + headerOffset + dataSize;
_ownedBuffer = false; _ownedBuffer = false;
EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]");
@ -267,8 +311,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
{ {
var owned = ArrayPool<byte>.Shared.Rent(totalRequest); var owned = ArrayPool<byte>.Shared.Rent(totalRequest);
buffer = owned; buffer = owned;
position = HeaderSize; position = headerOffset;
bufferEnd = HeaderSize + dataSize; bufferEnd = headerOffset + dataSize;
_ownedBuffer = true; _ownedBuffer = true;
EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]"); EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]");

View File

@ -139,22 +139,22 @@ public class AcBinaryHubProtocol : IHubProtocol
_options = options.SerializerOptions; _options = options.SerializerOptions;
_options.BufferWriterChunkSize = options.BufferSize; _options.BufferWriterChunkSize = options.BufferSize;
_protocolMode = options.ProtocolMode; _protocolMode = options.ProtocolMode;
_logger = options.Logger; _logger = options.Logger;
_waitForFlush = options.WaitForFlush; _waitForFlush = options.WaitForFlush;
_flushTimeout = options.FlushTimeout; _flushTimeout = options.FlushTimeout;
Name = options.Name; Name = options.Name;
_chunkStates = new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>(); _chunkStates = new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>();
if (_logger != null) _logger?.LogInformation(
{
_logger.LogInformation(
"AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}",
Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity,
_waitForFlush, _flushTimeout.TotalMilliseconds, _waitForFlush, _flushTimeout.TotalMilliseconds,
_options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression);
} }
}
/// <summary> /// <summary>
/// Runtime-replaceable serializer options. /// Runtime-replaceable serializer options.
@ -191,12 +191,9 @@ public class AcBinaryHubProtocol : IHubProtocol
var task = vt.AsTask(); var task = vt.AsTask();
if (!task.Wait(_flushTimeout)) return task.Wait(_flushTimeout)
throw new TimeoutException( ? task.GetAwaiter().GetResult()
$"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + : throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected.");
"consumer may be too slow, stuck, or disconnected.");
return task.GetAwaiter().GetResult();
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
@ -501,10 +498,12 @@ public class AcBinaryHubProtocol : IHubProtocol
chunkStartPayload = bw.Position + externalBytes; chunkStartPayload = bw.Position + externalBytes;
bw.Flush(); bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload); Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload);
_logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload); _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload);
} }
SyncFlush(pipeWriter.FlushAsync()); SyncFlush(pipeWriter.FlushAsync());
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
@ -518,6 +517,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var endByte = pipeWriter.GetSpan(1); var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd; endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1); pipeWriter.Advance(1);
SyncFlush(pipeWriter.FlushAsync()); SyncFlush(pipeWriter.FlushAsync());
_logger?.LogTrace("WriteMessageChunked CHUNK_END written"); _logger?.LogTrace("WriteMessageChunked CHUNK_END written");

View File

@ -37,7 +37,7 @@ public sealed class AcBinaryHubProtocolOptions
/// </list> /// </list>
/// Ignored for Bytes and Segment modes. /// Ignored for Bytes and Segment modes.
/// </summary> /// </summary>
public bool WaitForFlush { get; set; } = true; public bool WaitForFlush { get; set; } = false;
/// <summary> /// <summary>
/// Maximum wait for a single synchronous <c>FlushAsync</c> before throwing /// Maximum wait for a single synchronous <c>FlushAsync</c> before throwing