From 910b0deab894b846ed318eb80b81024e99c5347f Mon Sep 17 00:00:00 2001 From: Loretta Date: Wed, 29 Apr 2026 16:09:33 +0200 Subject: [PATCH] [LOADED_DOCS: 2 files, no new loads] Separate raw and framed streaming in AcBinarySerializer Refactored AcBinarySerializer and AsyncPipeWriterOutput to support both raw (headerless) and multiplexed/framed ([201][UINT16][data]) streaming wire formats, controlled by a new flag and explicit APIs. Updated AsyncPipeReaderInput and AcBinaryDeserializer to match, with new constructor options and documentation. Expanded tests for both modes and added runtime type detection for flush strategy safety. Minor refactoring and doc improvements throughout. --- .../AcBinarySerializerNamedPipeTests.cs | 29 +-- .../AcBinarySerializerPipeParallelTests.cs | 165 +++++++++++++----- .../Binaries/AcBinaryDeserializer.cs | 100 +++++------ .../Binaries/AcBinarySerializer.cs | 140 ++++++++++----- .../Binaries/AsyncPipeReaderInput.cs | 74 +++++--- .../Binaries/AsyncPipeWriterOutput.cs | 138 ++++++++++----- .../SignalRs/AcBinaryHubProtocol.cs | 28 +-- .../SignalRs/AcBinaryHubProtocolOptions.cs | 2 +- 8 files changed, 446 insertions(+), 230 deletions(-) diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index d07f894..104f648 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -13,7 +13,7 @@ namespace AyCode.Core.Tests.Serialization; /// The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers — /// the tests own the / /// lifecycle directly and call the generic -/// + +/// + /// primitives. This proves /// the streaming framework works on arbitrary PipeWriter/PipeReader sources /// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in @@ -29,6 +29,7 @@ public class AcBinarySerializerNamedPipeTests public async Task RoundTrip_SmallChunkSize_PayloadEquals() { var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; + // 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing // for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end. var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; @@ -51,6 +52,7 @@ public class AcBinarySerializerNamedPipeTests #if DEBUG // Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed. var diagLogs = new List(); + AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}"); AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}"); #endif @@ -71,6 +73,7 @@ public class AcBinarySerializerNamedPipeTests // Deep structure: count items + pallets + measurements + points must match exactly var origCounts = CountTestOrderHierarchy(original); var resultCounts = CountTestOrderHierarchy(result); + Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); @@ -81,15 +84,17 @@ public class AcBinarySerializerNamedPipeTests #if DEBUG AsyncPipeReaderInput.DiagnosticLog = null; AsyncPipeWriterOutput.DiagnosticLog = null; + if (diagLogs.Count > 0) { Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ==="); + // Print last 60 entries (most relevant to failure point) var startIdx = Math.Max(0, diagLogs.Count - 60); - if (startIdx > 0) - Console.WriteLine($" ... ({startIdx} earlier entries elided)"); - for (var i = startIdx; i < diagLogs.Count; i++) - Console.WriteLine($" [{i}] {diagLogs[i]}"); + if (startIdx > 0) Console.WriteLine($" ... ({startIdx} earlier entries elided)"); + + for (var i = startIdx; i < diagLogs.Count; i++) Console.WriteLine($" [{i}] {diagLogs[i]}"); + Console.WriteLine($"=== End DiagnosticLog ==="); } #endif @@ -98,7 +103,7 @@ public class AcBinarySerializerNamedPipeTests /// /// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic - /// on + /// on /// the client side and /// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter / /// PipeReader. @@ -107,7 +112,7 @@ public class AcBinarySerializerNamedPipeTests { // Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with // the OS), so the client can immediately attempt connect once we hand off to async. - await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); + await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Message, System.IO.Pipes.PipeOptions.Asynchronous); var receiveTask = Task.Run(async () => { @@ -123,9 +128,11 @@ public class AcBinarySerializerNamedPipeTests var pipeWriter = PipeWriter.Create(pipeClient); try { - // Public PipeWriter overload — auto-selects sequential flush strategy because - // PipeWriter.Create(stream) returns StreamPipeWriter (race-incompatible with parallel send). - AcBinarySerializer.Serialize(original, pipeWriter, opts); + // Public PipeWriter overload (raw chunked stream — no per-chunk frame headers, + // bit-compatible with Serialize(v, opts) byte[] output). Auto-selects sequential + // flush strategy because PipeWriter.Create(stream) returns StreamPipeWriter + // (race-incompatible with parallel send). + AcBinarySerializer.SerializeChunked(original, pipeWriter, opts); } finally { @@ -155,7 +162,7 @@ public class AcBinarySerializerNamedPipeTests // Note: a "default chunk size" test was deliberately omitted. The default // AcBinarySerializerOptions.BufferWriterChunkSize used to be 65536, which exceeded the - // UINT16 max (65535). Fixed in this work to 65535. Tests above explicitly set chunk size + // UINT16 max (256). Fixed in this work to 256. Tests above explicitly set chunk size // for reproducibility regardless of default. private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount) diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs index 2435998..4543bd5 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs @@ -1,5 +1,6 @@ using AyCode.Core.Serializers.Binaries; using AyCode.Core.Tests.TestModels; +using System.IO; using System.IO.Pipelines; using static AyCode.Core.Tests.TestModels.AcSerializerModels; @@ -8,17 +9,18 @@ namespace AyCode.Core.Tests.Serialization; /// /// Unit tests for (Step 1, ACCORE-BIN-T-D6H4) and the /// extension (Step 2, ACCORE-BIN-T-M2K1), -/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9). +/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect +/// sanity pinning (Step 4). /// -/// The receiver-side is framing-aware: it -/// expects the AsyncSegment chunked wire format [201][UINT16 LE size][data] per chunk, -/// tolerates [200] CHUNK_START prefix, and signals end-of-stream on [202] -/// CHUNK_END. The helper wraps test data into single chunk -/// frames; multi-chunk tests concatenate multiple frames. +/// Tests run with 's default stripChunkFraming = true — +/// expects the AsyncSegment chunked wire format +/// [201][UINT16 LE size][data] per chunk, tolerates [200] CHUNK_START prefix, and +/// signals end-of-stream on [202] CHUNK_END. The helper +/// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames. /// -/// Wire format identical to output and to SignalR's -/// AcBinaryHubProtocol.TryParseChunkData input — unified across all transports per -/// ADR-0003 §9. +/// Wire format identical to framed output and to +/// SignalR's AcBinaryHubProtocol.TryParseChunkData input — unified across all transports +/// per ADR-0003 §9. /// [TestClass] public class AcBinarySerializerPipeParallelTests @@ -72,6 +74,7 @@ public class AcBinarySerializerPipeParallelTests public void Initialize_AfterFeed_ReturnsAvailableData() { using var input = new AsyncPipeReaderInput(64); + var data = new byte[] { 10, 20, 30 }; input.Feed(WrapInChunkFrame(data)); @@ -88,7 +91,8 @@ public class AcBinarySerializerPipeParallelTests public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse() { using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); + + input.Feed(WrapInChunkFrame([1, 2, 3])); input.Complete(); // Simulate consumer that has read all 3 bytes @@ -103,16 +107,18 @@ public class AcBinarySerializerPipeParallelTests public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder() { using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); - input.Feed(WrapInChunkFrame(new byte[] { 4, 5, 6 })); + + input.Feed(WrapInChunkFrame([1, 2, 3])); + input.Feed(WrapInChunkFrame([4, 5, 6])); input.Complete(); // Simulate consumer that has read 3 of 6 bytes — advance should expose the rest input.Initialize(out var buffer, out var position, out var bufferLength); Assert.AreEqual(6, bufferLength); - position = 3; + position = 3; var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); + Assert.IsTrue(hasMore); Assert.AreEqual(3, position); Assert.AreEqual(6, bufferLength); @@ -126,6 +132,7 @@ public class AcBinarySerializerPipeParallelTests { // Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow using var input = new AsyncPipeReaderInput(16); + var data = new byte[64]; for (var i = 0; i < data.Length; i++) data[i] = (byte)i; @@ -159,6 +166,7 @@ public class AcBinarySerializerPipeParallelTests while (offset < expected.Length) { var take = Math.Min(chunkSize, expected.Length - offset); + input.Feed(WrapInChunkFrame(expected, offset, take)); offset += take; } @@ -184,6 +192,7 @@ public class AcBinarySerializerPipeParallelTests using var input = new AsyncPipeReaderInput(32); var expected = new byte[totalBytes]; + for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF); var consumeTask = Task.Run(() => ConsumeAll(input)); @@ -210,6 +219,7 @@ public class AcBinarySerializerPipeParallelTests await Task.WhenAll(consumeTask, produceTask); var actual = consumeTask.Result; + Assert.AreEqual(expected.Length, actual.Length); CollectionAssert.AreEqual(expected, actual); } @@ -218,7 +228,7 @@ public class AcBinarySerializerPipeParallelTests public void Dispose_DoesNotThrow() { var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); + input.Feed(WrapInChunkFrame([1, 2, 3])); input.Complete(); input.Dispose(); @@ -237,12 +247,12 @@ public class AcBinarySerializerPipeParallelTests // Verifies the framing state machine survives partial frame headers / sizes / data // split across multiple Feed calls. using var input = new AsyncPipeReaderInput(64); + var data = new byte[] { 10, 20, 30, 40, 50 }; var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50] // Feed byte-by-byte to stress the state machine - for (var i = 0; i < frame.Length; i++) - input.Feed(frame.AsSpan(i, 1)); + for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1)); input.Complete(); var consumed = ConsumeAll(input); @@ -254,15 +264,17 @@ public class AcBinarySerializerPipeParallelTests { // [202] CHUNK_END alone (without external Complete()) should signal end-of-stream. using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); - input.Feed(new byte[] { 202 }); // CHUNK_END marker only — no external Complete() + + input.Feed(WrapInChunkFrame([1, 2, 3])); + input.Feed([202]); // CHUNK_END marker only — no external Complete() // Should observe completion: TryAdvanceSegment returns false on empty after consume input.Initialize(out var buffer, out var position, out var bufferLength); Assert.AreEqual(3, bufferLength); - position = bufferLength; + position = bufferLength; var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); + Assert.IsFalse(hasMore); } @@ -272,7 +284,7 @@ public class AcBinarySerializerPipeParallelTests using var input = new AsyncPipeReaderInput(64); // Byte 0x42 is not 200/201/202 — should throw - _ = Assert.ThrowsExactly(() => input.Feed(new byte[] { 0x42 })); + _ = Assert.ThrowsExactly(() => input.Feed([0x42])); } // ==================================================================== @@ -285,8 +297,7 @@ public class AcBinarySerializerPipeParallelTests var pipe = new Pipe(); await pipe.Writer.CompleteAsync(); - await Assert.ThrowsExactlyAsync(async () => - await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader)); + await Assert.ThrowsExactlyAsync(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader)); } [TestMethod] @@ -294,8 +305,7 @@ public class AcBinarySerializerPipeParallelTests { using var input = new AsyncPipeReaderInput(64); - await Assert.ThrowsExactlyAsync(async () => - await input.DrainFromAsync(null!)); + await Assert.ThrowsExactlyAsync(async () => await input.DrainFromAsync(null!)); } [TestMethod] @@ -390,12 +400,12 @@ public class AcBinarySerializerPipeParallelTests // ==================================================================== // Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9) // - // True 3-task pipeline: AcBinarySerializer writes to pipe.Writer chunk-by-chunk via - // AsyncPipeWriterOutput (under the hood) — drainer pulls from pipe.Reader via - // DrainFromAsync — deserializer reads from AsyncPipeReaderInput. All three run - // concurrently with TRUE serialize↔deserialize overlap (the serializer is still writing - // the tail of the message while the deserializer has already consumed the head, courtesy - // of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput). + // True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via + // AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader + // via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed). + // All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is + // still writing the tail of the message while the deserializer has already consumed the + // head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput). // // BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk // boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state @@ -411,8 +421,7 @@ public class AcBinarySerializerPipeParallelTests var pipe = new Pipe(); using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); - var deserTask = Task.Run(() => - AcBinaryDeserializer.Deserialize(input, opts)); + var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, opts)); var drainTask = input.DrainFromAsync(pipe.Reader); @@ -420,9 +429,10 @@ public class AcBinarySerializerPipeParallelTests { try { - // PipeWriter overload — writes chunked AsyncSegment framing ([201][UINT16][data]). - // AsyncPipeReaderInput.Feed strips framing internally on the receive side. - AcBinarySerializer.Serialize(original, pipe.Writer, opts); + // SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire. + // AsyncPipeReaderInput.Feed strips framing internally on the receive side + // (default stripChunkFraming = true). + AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); } finally { @@ -456,7 +466,7 @@ public class AcBinarySerializerPipeParallelTests var drainTask = input.DrainFromAsync(pipe.Reader); var serTask = Task.Run(async () => { - try { AcBinarySerializer.Serialize(original, pipe.Writer, opts); } + try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); } finally { await pipe.Writer.CompleteAsync(); } }); @@ -471,6 +481,7 @@ public class AcBinarySerializerPipeParallelTests var origCounts = CountTestOrderHierarchy(original); var resultCounts = CountTestOrderHierarchy(result); + Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); @@ -479,7 +490,7 @@ public class AcBinarySerializerPipeParallelTests private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order) { - int items = order.Items.Count; + var items = order.Items.Count; int pallets = 0, measurements = 0, points = 0; foreach (var item in order.Items) { @@ -487,13 +498,84 @@ public class AcBinarySerializerPipeParallelTests foreach (var p in item.Pallets) { measurements += p.Measurements.Count; - foreach (var m in p.Measurements) - points += m.Points.Count; + points += p.Measurements.Sum(m => m.Points.Count); } } return (items, pallets, measurements, points); } + // ==================================================================== + // Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning + // ==================================================================== + // + // Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a + // different runtime type than new Pipe().Writer.GetType(). This is what makes + // AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential + // (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal + // BCL type names directly. If a future .NET unifies the two writer impls or renames the + // internal type in a way that breaks the detect, these tests fail before prod. + + [TestMethod] + public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes() + { + var pipeBased = new Pipe().Writer.GetType(); + var streamBased = PipeWriter.Create(Stream.Null).GetType(); + + // Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire + // would either always-true or always-false, both of which break correctness. + Assert.AreNotEqual(pipeBased, streamBased, + $"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " + + $"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}"); + + // Living documentation — typenames printed for debugging on future .NET upgrades. + Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}"); + Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}"); + } + + [TestMethod] + public void StreamPipeWriterTypeField_MatchesFactoryResult() + { + // The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType() + // at class-load time. A second call to the factory MUST yield the same Type instance — + // otherwise the cache is stale and the runtime detect mis-classifies all stream writers. + var freshType = PipeWriter.Create(Stream.Null).GetType(); + + Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType, + "Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " + + "behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput)."); + } + + [TestMethod] + public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse() + { + // The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else + // sequential mode would be wrongly selected and we'd lose the parallelism feature. + var pipeBasedType = new Pipe().Writer.GetType(); + + Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType), + $"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " + + $"(or subclass) — runtime detect would mis-classify it as sequential."); + } + + [TestMethod] + public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue() + { + // PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) — + // even for unusual stream types (file, memory, null). + Type[] writerTypes = + [ + PipeWriter.Create(Stream.Null).GetType(), + PipeWriter.Create(new MemoryStream()).GetType(), + ]; + + foreach (var t in writerTypes) + { + Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t), + $"PipeWriter.Create() returned typename={t.FullName} which is not " + + $"assignable to StreamPipeWriterType — the BCL changed its factory contract."); + } + } + // ==================================================================== // Test helpers // ==================================================================== @@ -502,15 +584,16 @@ public class AcBinarySerializerPipeParallelTests /// Wraps a raw payload in a single AsyncSegment chunk frame: [201][UINT16 LE size][data]. /// Matches the wire format produced by per chunk. /// - private static byte[] WrapInChunkFrame(byte[] data) - => WrapInChunkFrame(data, 0, data.Length); + private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length); private static byte[] WrapInChunkFrame(byte[] data, int offset, int length) { var result = new byte[3 + length]; + result[0] = 201; // CHUNK_DATA marker result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte + Array.Copy(data, offset, result, 3, length); return result; } diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index ecce44e..fe9f8fb 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -48,8 +48,7 @@ public static partial class AcBinaryDeserializer internal static void Register(Type type, IGeneratedBinaryReader reader) => Readers[type] = reader; [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static IGeneratedBinaryReader? TryGet(Type type) => - Readers.TryGetValue(type, out var reader) ? reader : null; + internal static IGeneratedBinaryReader? TryGet(Type type) => Readers.GetValueOrDefault(type); } /// @@ -119,18 +118,15 @@ public static partial class AcBinaryDeserializer readers[BinaryTypeCode.ByteArray] = static (ctx, _, _) => ReadByteArray(ctx); // Register FixStr readers - for (byte code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++) + for (var code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++) { var length = BinaryTypeCode.DecodeFixStrLength(code); readers[code] = CreateFixStrReader(length); } // Register FixObj slot readers (0..SlotCount-1) - for (int slot = 0; slot < BinaryTypeCode.SlotCount; slot++) - { - readers[slot] = CreateFixObjReader(slot); - } - + for (var slot = 0; slot < BinaryTypeCode.SlotCount; slot++) readers[slot] = CreateFixObjReader(slot); + return readers; } } @@ -140,11 +136,9 @@ public static partial class AcBinaryDeserializer /// Creates a reader for FixStr with the given length. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static TypeReader CreateFixStrReader(int length) - where TInput : struct, IBinaryInputBase + private static TypeReader CreateFixStrReader(int length) where TInput : struct, IBinaryInputBase { - if (length == 0) - return static (_, _, _) => string.Empty; + if (length == 0) return static (_, _, _) => string.Empty; return (ctx, _, _) => ctx.ReadStringUtf8(length); } @@ -153,13 +147,12 @@ public static partial class AcBinaryDeserializer /// Creates a reader for FixObj slot (0..SlotCount-1). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static TypeReader CreateFixObjReader(int slot) - where TInput : struct, IBinaryInputBase + private static TypeReader CreateFixObjReader(int slot) where TInput : struct, IBinaryInputBase { return (ctx, targetType, depth) => ReadObjectFromSlot(ctx, slot, targetType, depth); } - private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true); + //private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true); #region Public API @@ -184,6 +177,7 @@ public static partial class AcBinaryDeserializer var context = DeserializationContextPool.Get(options); context.InitInput(new ArrayBinaryInput(data)); + try { return (T?)DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -193,8 +187,7 @@ public static partial class AcBinaryDeserializer /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static T? Deserialize(byte[] data, int offset, int length) - => Deserialize(data, offset, length, AcBinarySerializerOptions.Default); + public static T? Deserialize(byte[] data, int offset, int length) => Deserialize(data, offset, length, AcBinarySerializerOptions.Default); /// /// Deserialize binary data to object of type T from a sub-range with options. @@ -206,11 +199,11 @@ public static partial class AcBinaryDeserializer if (length == 1 && data[offset] == BinaryTypeCode.Null) return default; var targetType = typeof(T); - if (AcSerializerCommon.IsExpressionType(targetType)) - return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options); + if (AcSerializerCommon.IsExpressionType(targetType)) return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options); var context = DeserializationContextPool.Get(options); context.InitInput(new ArrayBinaryInput(data, offset, length)); + try { return (T?)DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -218,14 +211,12 @@ public static partial class AcBinaryDeserializer /// /// Deserialize binary data to specified type. /// - public static object? Deserialize(byte[] data, Type targetType) - => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default); + public static object? Deserialize(byte[] data, Type targetType) => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default); /// /// Deserialize binary data to specified type with options. /// - public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) - => Deserialize(data, 0, data.Length, targetType, options); + public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) => Deserialize(data, 0, data.Length, targetType, options); /// /// Deserialize binary data to specified type from a sub-range. @@ -332,16 +323,21 @@ public static partial class AcBinaryDeserializer /// Transport-agnostic: works with any PipeReader source — NamedPipe IPC /// (PipeReader.Create(namedPipeServerStream)), file-stream /// (PipeReader.Create(fileStream)), TCP (PipeReader.Create(networkStream)), - /// or custom PipeReader implementations. Strips the [201][UINT16 size][data] - /// chunked framing internally via . + /// or custom PipeReader implementations. Reads raw AcBinary bytes verbatim from + /// the pipe — no wire-format unwrapping. Pair with the producer-side + /// + /// (or its overload), which writes the same raw byte + /// stream as 's + /// byte[] output. /// /// Receive buffer initial capacity is derived from options.BufferWriterChunkSize × 2 /// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the /// message's lifetime regardless of total payload size. /// - /// For the producer side: see - /// - /// or . + /// For the multiplexed wire format (per-chunk [201][UINT16][data] headers, + /// produced by SerializeChunkedFramed or SignalR's AsyncSegment mode): the parser + /// strips framing on its own (e.g. AcBinaryHubProtocol.TryParseChunkData) and feeds + /// only the data bytes here. /// /// Source pipe reader. Caller owns lifecycle (creation + completion). /// Serializer options. Defaults to . @@ -354,7 +350,11 @@ public static partial class AcBinaryDeserializer var opts = options ?? AcBinarySerializerOptions.Default; - using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); + // Raw mode (stripChunkFraming: false) — bytes drained from the PipeReader are forwarded + // verbatim to the deserialization buffer. Pair with AcBinarySerializer.SerializeChunked + // (raw byte stream) on the producer side; for chunked-framed wire formats the parser + // strips framing upstream and feeds only data bytes here. + using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, stripChunkFraming: false); var deserTask = Task.Run(() => Deserialize(input, opts), ct); await input.DrainFromAsync(reader, ct).ConfigureAwait(false); @@ -1654,7 +1654,7 @@ public static partial class AcBinaryDeserializer if (targetType.IsArray) { var array = Array.CreateInstance(elementType, count); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var value = ReadValue(context, elementType, nextDepth); array.SetValue(value, i); @@ -1681,7 +1681,7 @@ public static partial class AcBinaryDeserializer try { - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var value = ReadValue(context, elementType, nextDepth); list.Add(value); @@ -1706,7 +1706,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, IntType)) { var array = new int[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (BinaryTypeCode.IsTinyInt(typeCode)) @@ -1724,7 +1724,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DoubleType)) { var array = new double[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Float64) return null; @@ -1738,7 +1738,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, LongType)) { var array = new long[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (BinaryTypeCode.IsTinyInt(typeCode)) @@ -1758,7 +1758,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, BoolType)) { var array = new bool[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode == BinaryTypeCode.True) array[i] = true; @@ -1773,7 +1773,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, GuidType)) { var array = new Guid[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Guid) return null; @@ -1787,7 +1787,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DecimalType)) { var array = new decimal[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Decimal) return null; @@ -1801,7 +1801,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DateTimeType)) { var array = new DateTime[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.DateTime) return null; @@ -1815,7 +1815,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, FloatType)) { var array = new float[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Float32) return null; @@ -1829,7 +1829,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, ShortType)) { var array = new short[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Int16) return null; @@ -1843,7 +1843,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, UShortType)) { var array = new ushort[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt16) return null; @@ -1857,7 +1857,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, UIntType)) { var array = new uint[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt32) return null; @@ -1871,7 +1871,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, ULongType)) { var array = new ulong[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt64) return null; @@ -1885,7 +1885,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, SByteType)) { var array = new sbyte[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Int8) return null; @@ -1899,7 +1899,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, CharType)) { var array = new char[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Char) return null; @@ -1913,7 +1913,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DateTimeOffsetType)) { var array = new DateTimeOffset[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.DateTimeOffset) return null; @@ -1927,7 +1927,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, TimeSpanType)) { var array = new TimeSpan[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.TimeSpan) return null; @@ -1964,7 +1964,7 @@ public static partial class AcBinaryDeserializer var dict = (IDictionary)Activator.CreateInstance(dictType, count)!; var nextDepth = depth + 1; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var key = ReadValue(context, keyType, nextDepth); var value = ReadValue(context, valueType, nextDepth); @@ -2196,7 +2196,7 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { var count = (int)context.ReadVarUInt(); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { SkipValue(context, metaData); } @@ -2206,7 +2206,7 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { var count = (int)context.ReadVarUInt(); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { SkipValue(context, metaData); // key SkipValue(context, metaData); // value diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 1a95d48..f635f95 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -423,21 +423,17 @@ public static partial class AcBinarySerializer } /// - /// Serialize to a with chunked protocol framing via - /// — gives the caller full - /// + control because - /// is always the BCL PipeWriterImpl, which is parallel-capable (no _tailMemory - /// reset race like StreamPipeWriter). + /// Serialize to a as a chunked stream — pure AcBinary + /// bytes are written via in raw mode (no per-chunk header). + /// The output is byte-compatible with 's + /// byte[] result; a consumer can drain pipe.Reader and feed the bytes directly to + /// (or pipe-them through DeserializeFromPipeReaderAsync) + /// with no extra parser. /// - /// Each chunk (including the last) is framed as [201][UINT16 size][data] and - /// committed to pipe.Writer via Advance (zero-copy). A consumer drains - /// pipe.Reader on a background task and writes to the actual transport. - /// - /// Use this overload when you constructed new Pipe() yourself and need - /// runtime tuning of the flush strategy. For arbitrary - /// (Kestrel transport output, PipeWriter.Create(stream), custom writers), use the - /// - /// overload. + /// Why instead of ? + /// Pipe.Writer is always the BCL PipeWriterImpl, which is parallel-capable + /// (no _tailMemory reset race like StreamPipeWriter). This overload exposes the + /// + tuning safely. /// /// The value to serialize; null writes a single null marker. /// Target pipe — caller drains pipe.Reader elsewhere. @@ -452,49 +448,108 @@ public static partial class AcBinarySerializer /// Per-flush timeout. null → wait forever. Positive value: throws /// on stuck consumers. /// - /// Total serialized data bytes (excluding framing overhead). - public static int Serialize(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + /// Total serialized bytes written. + public static int SerializeChunked(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (pipe is null) throw new ArgumentNullException(nameof(pipe)); - return Serialize(value, pipe.Writer, options, waitForFlush, flushTimeout); + return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: false); } /// - /// Serialize to any with chunked protocol framing - /// via . Each chunk (including the last) is framed as - /// [201][UINT16 size][data] and committed to the PipeWriter via Advance (zero-copy). + /// Serialize to any as a chunked stream — pure + /// AcBinary bytes, no per-chunk header. The output is byte-compatible with + /// 's byte[] result. /// - /// Flush strategy is auto-selected by writer type: - /// StreamPipeWriter (from PipeWriter.Create(stream) — NamedPipe, FileStream, - /// NetworkStream, etc.) runs sequentially per chunk because the BCL impl resets - /// _tailMemory on flush completion (race-incompatible with parallel send). All other - /// PipeWriter implementations (Kestrel transport, custom impls) run with the safe - /// waitForFlush=true default — max parallelism, zero-alloc. + /// Flush strategy auto-selected by writer type: StreamPipeWriter + /// (PipeWriter.Create(stream) — NamedPipe / FileStream / NetworkStream / etc.) runs + /// sequentially per chunk because the BCL impl resets _tailMemory on flush completion + /// (race-incompatible with parallel send). Other PipeWriter implementations (Kestrel transport, + /// custom impls) run with the safe waitForFlush=true default — max parallelism, zero-alloc. /// - /// Need runtime tuning of the flush strategy? If you control the pipe yourself, - /// build a instance and use the - /// - /// overload — only Pipe-based writers can guarantee parallel-capable flush behavior. + /// Need runtime tuning of the flush strategy? Build a + /// instance and use + /// + /// — only Pipe-based writers can guarantee parallel-capable flush behavior. + /// + /// Need a multiplexed wire format with per-chunk frame headers? See + /// . /// /// The value to serialize; null writes a single null marker. /// Target pipe writer. /// Serializer options (type wrappers, reference handling, interning, etc.). - /// Total serialized data bytes (excluding framing overhead). - public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) - => Serialize(value, pipeWriter, options, waitForFlush: true, flushTimeout: null); + /// Total serialized bytes written. + public static int SerializeChunked(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: false); /// - /// Internal flush-tunable PipeWriter overload — only callable from AyCode.Services - /// (SignalR hub protocol) because external callers cannot safely choose - /// without knowing the concrete implementation. - /// SignalR uses Kestrel transport output, which is parallel-capable, and forwards the - /// hub-protocol-options-configured tuning here. + /// Serialize a value into a chunked stream where each chunk carries a self-describing + /// frame header — [201][UINT16 size][data] per chunk, with a final [202] + /// end-of-stream marker. The frame headers let the receiver detect chunk boundaries + /// incrementally without knowing the total payload size up front, and let multiple + /// independent messages share a single transport with reliable separation. /// - /// For the public API, see the overload (parallel-capable, - /// tuning paramters available) or the simple overload - /// (auto-selects strategy, no tuning). + /// Use this when building a multiplexed wire protocol where several logical + /// messages are interleaved on one stream, when the receiver needs to start deserializing + /// as bytes arrive (pipeline parallelism — serialize / network / deserialize overlap), or + /// when the upper layer needs to dispatch each chunk independently. Typical scenarios: + /// real-time RPC, custom Hub-style protocols, event stream multiplexing. + /// + /// Concrete example: SignalR's BinaryProtocolMode.AsyncSegment uses + /// this exact wire format to interleave many HubMessages over a single connection. + /// + /// Need a simpler streaming output without per-chunk metadata? Use + /// + /// — bit-compatible with 's + /// byte[] output, no extra parser needed on the receive side. + /// + /// The value to serialize; null writes a single null marker. + /// Target pipe — caller drains pipe.Reader elsewhere. + /// Serializer options. + /// See . + /// See . + /// Total serialized data bytes (excluding framing overhead). + public static int SerializeChunkedFramed(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + { + if (pipe is null) throw new ArgumentNullException(nameof(pipe)); + return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: true); + } + + /// + /// Serialize to any with per-chunk frame headers + /// (multiplexed wire format). See + /// + /// for the wire format details and use-cases. + /// + /// Flush strategy auto-selected by writer type — see + /// . + /// + public static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: true); + + /// + /// Internal flush-tunable framed PipeWriter overload — used by AyCode.Services + /// (SignalR hub protocol) on Kestrel transport output, which is parallel-capable. External + /// callers should use the overload to safely tune + /// on a guaranteed parallel-capable writer. + /// + internal static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true); + + /// + /// Internal legacy alias for + /// — kept until the SignalR hub protocol (AcBinaryHubProtocol.cs) is migrated to the + /// new name in a separate, isolated step. Identical behavior to SerializeChunkedFramed + /// (framed wire format with [201][UINT16][data] per chunk + [202] end marker). /// internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true); + + /// + /// Common pipe-output serialization core. Same loop for both raw () + /// and framed () modes — the only difference flows through + /// into the ctor. + /// + private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool emitChunkFraming) { if (value == null) { @@ -507,7 +562,8 @@ public static partial class AcBinarySerializer var runtimeType = value.GetType(); var context = BinarySerializationContextPool.Get(options); - context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush, flushTimeout); + + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, emitChunkFraming, waitForFlush, flushTimeout); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index 683ae57..7245688 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -1,9 +1,6 @@ -using System; using System.Buffers; using System.Diagnostics; -using System.IO; using System.Runtime.CompilerServices; -using System.Threading; namespace AyCode.Core.Serializers.Binaries; @@ -18,13 +15,19 @@ namespace AyCode.Core.Serializers.Binaries; /// .NET BCL convention for type-level Async prefix (AsyncEnumerable, /// IAsyncDisposable, AsyncLocal<T>, ...). /// +/// behavior is driven by the stripChunkFraming ctor flag: +/// true (default) — parses [201][UINT16][data] chunked frames + [202] end +/// marker (matches AsyncPipeWriterOutput framed output and SignalR's AsyncSegment wire +/// format); false — appends bytes verbatim (matches AcBinarySerializer.SerializeChunked +/// raw output drained from a ). +/// /// Usage modes: /// /// Push (Feed-API): producer thread calls with chunk bytes /// (typical for SignalR TryParseChunkData). /// Pull (DrainFromAsync extension): helper drains a /// into the input via repeated -/// calls (typical for NamedPipe / FileStream). +/// calls (typical for NamedPipe / FileStream / NetworkStream). /// /// /// Backed by a single contiguous byte[] from . Positions reset @@ -57,16 +60,21 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable private int _readPos; // consumer reports consumed position here private bool _completed; + // Whether Feed() should strip [201][UINT16][data] chunked framing (true, default — matches + // SignalR-style multiplexed wire) or append bytes verbatim (false — matches the raw output + // of SerializeChunked / single-shot byte[] over PipeWriter). + private readonly bool _stripChunkFraming; + // Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END. - // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's output; - // identical parsing logic to AcBinaryHubProtocol.TryParseChunkData but stream-stateful - // across Feed(span) boundaries (the SignalR side has ReadOnlySequence with rewind; - // we get arbitrary spans). + // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's framed + // output and SignalR's AsyncSegment chunked frame format. Only active when + // _stripChunkFraming = true. private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data] private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-stream private FramingState _framingState = FramingState.AwaitingHeader; + private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame @@ -108,36 +116,54 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// 4 KB chunk size, 128 KB for the standalone 64 KB default). /// /// Initial buffer size. Rounded up by ArrayPool. - /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds. - public AsyncPipeReaderInput(int initialCapacity) + /// + /// true (default): parses [201][UINT16][data] chunked frames + + /// [202] end marker (matches framed output and + /// SignalR's AsyncSegment chunked wire format). + /// false: appends bytes verbatim — for raw byte streams (matches + /// AcBinarySerializer.SerializeChunked output and the single-shot + /// byte[] output). + /// + public AsyncPipeReaderInput(int initialCapacity, bool stripChunkFraming = true) { if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity)); _buffer = ArrayPool.Shared.Rent(initialCapacity); + _stripChunkFraming = stripChunkFraming; _dataAvailable = new ManualResetEventSlim(false); } // --- Producer API (push) --- /// - /// Feeds raw chunked-wire bytes (any combination of complete or partial - /// [201][UINT16 LE size][data] frames, optional [200] CHUNK_START prefix, - /// trailing [202] CHUNK_END). Strips framing internally; only the unwrapped - /// data bytes land in the consumer-visible buffer. - /// - /// State is preserved across Feed calls — partial frame headers, mid-size - /// boundaries, and mid-data boundaries all resume correctly on the next call. - /// - /// Wire format identical to output and to - /// SignalR's AsyncSegment chunked frame format. Unified across all transports per ADR-0003. - /// - /// On [202], sets the completion flag and signals waiting consumers — equivalent - /// to an external call. Bytes after [202] are ignored. + /// Feeds bytes into the consumer-visible buffer. Behavior is driven by the + /// stripChunkFraming ctor flag: + /// + /// stripChunkFraming = true (default): expects the chunked wire format + /// [201][UINT16 LE size][data] per chunk, tolerates [200] + /// CHUNK_START prefix, and signals end-of-stream on [202] CHUNK_END. State + /// is preserved across Feed calls — partial frame headers, mid-size boundaries, + /// and mid-data boundaries all resume correctly. On [202], sets the completion + /// flag and signals waiting consumers — equivalent to an external + /// call. Bytes after [202] are ignored. + /// stripChunkFraming = false: appends bytes verbatim — no wire-format + /// interpretation. The producer must pass only payload bytes (e.g. raw byte stream + /// drained from a paired with + /// AcBinarySerializer.SerializeChunked). + /// /// public void Feed(ReadOnlySpan data) { if (data.IsEmpty) return; + if (!_stripChunkFraming) + { + // Raw mode: append verbatim, no framing interpretation. + AppendToBuffer(data); + return; + } + + // Framed mode: state machine parses [201][UINT16 LE size][data] frames + [202] end marker. var i = 0; while (i < data.Length) { @@ -214,7 +240,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable } /// - /// Appends unwrapped data bytes to the internal buffer with sliding-window cycling + /// Appends data bytes to the internal buffer with sliding-window cycling /// (reset to 0 when consumer has caught up) and grow-as-last-resort. Signals the consumer. /// private void AppendToBuffer(ReadOnlySpan data) diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 685ff94..f29d66f 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -1,26 +1,34 @@ -using System; using System.Buffers; using System.Buffers.Binary; using System.Diagnostics; -using System.IO; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -using System.Threading.Tasks; namespace AyCode.Core.Serializers.Binaries; /// -/// Binary output that writes to a PipeWriter with per-chunk self-describing framing. +/// Binary output that writes to a PipeWriter chunk-by-chunk via the PipeWriter's natural slabbing. +/// Two wire-format modes are supported, selected by the emitChunkFraming ctor flag: /// -/// Each chunk (including the last) is framed as [201][UINT16 size][data] with a 3-byte header -/// reserved at the start of each buffer. The serializer context writes into the space after the -/// reserved bytes; on , the header is patched and the full chunk is committed via -/// Advance (zero-copy). does the same for the last (partial) chunk. +/// +/// emitChunkFraming = false (raw): pure AcBinary bytes are written into +/// the PipeWriter's slabs and committed via Advance — no per-chunk header bytes appear on the +/// wire. Bit-compatible with the single-shot Serialize(value, opts) → byte[] output. +/// The receiver can deserialize the byte stream as-is (e.g. via +/// AcBinaryDeserializer.Deserialize(byte[]) after collecting, or any raw +/// PipeReader-based path). /// -/// The protocol layer writes a single [202] byte after all chunks to signal end-of-stream. +/// emitChunkFraming = true (framed): each chunk gets a 3-byte header +/// [201][UINT16 size][data]. The header is reserved at the start of each acquired slab; +/// the serializer writes data after it, and on commit the size is patched and the full chunk +/// is Advanced (zero-copy). The protocol layer writes a single [202] byte after all +/// chunks to signal end-of-stream. This is the multiplexed wire format used by SignalR's +/// BinaryProtocolMode.AsyncSegment and any custom multiplexed protocol where the +/// receiver needs incremental chunk-boundary detection. +/// /// -/// Backpressure modes (controlled by waitForFlush): +/// Backpressure modes (controlled by waitForFlush) — independent of framing: /// /// waitForFlush=true (default): Grow() waits for the previous FlushAsync before /// starting a new chunk. Pro: maximum pipeline parallelism, guaranteed end-to-end zero-copy. @@ -32,12 +40,18 @@ namespace AyCode.Core.Serializers.Binaries; /// for that chunk. /// /// +/// Flush strategy auto-selects on writer type — Stream-backed PipeWriters +/// (PipeWriter.Create(Stream)) run sequentially per chunk because of the +/// StreamPipeWriter._tailMemory reset race; Pipe-based and Kestrel transport writers +/// keep parallelism. Orthogonal to the framing flag: all four (framed/raw) × (sequential/parallel) +/// combinations work. +/// /// Timeout safety: every synchronous flush-await is bounded by flushTimeout /// (default when the type is used directly; /// passes 10 s from its options). A /// propagates to the caller, allowing the connection to abort instead of blocking forever. /// -/// Maximum chunk data size: 65535 bytes (UINT16 max). +/// Maximum chunk data size (in framed mode): 65535 bytes (UINT16 max). /// public struct AsyncPipeWriterOutput : IBinaryOutputBase { @@ -51,16 +65,21 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public const int MaxChunkSize = ushort.MaxValue; /// - /// Cached runtime type, discovered via the public + /// Cached runtime type, discovered via the public /// factory at class-load /// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the /// internal type, this auto-tracks). The dummy instance is unreachable after class init /// and GC-collected; the static field retains only the reference. + /// + /// internal visibility — exposed for sanity-check tests in + /// AyCode.Core.Tests (verifies new Pipe().Writer.GetType() != StreamPipeWriterType + /// and that the runtime detect can never accidentally fire on Pipe-based writers). /// - private static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); + internal static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; + private readonly bool _emitChunkFraming; private readonly bool _waitForFlush; private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; @@ -80,31 +99,39 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase [Conditional("DEBUG")] private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message); - /// Creates an output bound to the given PipeWriter with self-describing chunked framing. - /// Target pipe (typically Kestrel's transport output for SignalR). + /// Creates an output bound to the given PipeWriter — framed or raw mode per . + /// Target pipe (typically Kestrel's transport output for SignalR, NamedPipe, FileStream, or any custom ). /// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size. + /// true → write [201][UINT16][data] per-chunk header (multiplexed wire format). + /// false → raw AcBinary bytes only, byte-compatible with the single-shot byte[] output. See class summary. /// See class summary — pipeline parallelism (true) vs adaptive (false). /// Per-flush timeout. null /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers. - public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool emitChunkFraming = true, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (chunkSize > MaxChunkSize) - throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, - $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); + throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); _pipeWriter = pipeWriter; _chunkSize = chunkSize; + _emitChunkFraming = emitChunkFraming; _waitForFlush = waitForFlush; + // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. - _flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan; + _flushTimeout = flushTimeout ?? Timeout.InfiniteTimeSpan; + // StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default // at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this // class deliberately uses. For Stream-backed writers, fully await the just-started flush // before acquiring the next chunk's memory (the writer-correct usage pattern; flush is // a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT // reset state on flush completion → the parallelism feature stays intact for them. - _serializeFlushAndAcquire = pipeWriter.GetType() == StreamPipeWriterType; + // + // IsAssignableFrom (not ==) so any future BCL subclass of StreamPipeWriter automatically + // picks the safe sequential path — forward-compat, no silent breakage on .NET upgrades. + _serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter); + _committedBytes = 0; _ownedBuffer = false; _lastFlush = default; @@ -125,29 +152,31 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase var task = vt.AsTask(); if (!task.Wait(_flushTimeout)) - throw new TimeoutException( - $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + - "consumer may be too slow, stuck, or disconnected."); + throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected."); // Completed within timeout — propagate any faulted exception task.GetAwaiter().GetResult(); } /// - /// Provides the initial buffer from the PipeWriter with 3-byte header reservation. + /// Provides the initial buffer from the PipeWriter — in framed mode, the buffer's first 3 + /// bytes are reserved for the chunk header (filled at ); + /// in raw mode, no reservation, the data starts at position = 0. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; _lastFlush = default; + AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); _currentChunkStart = position; } /// - /// Called when the context's buffer is full. Patches the chunk header [201][UINT16 size], - /// commits the chunk to the PipeWriter, and fires a background flush. + /// Called when the context's buffer is full. Commits the chunk to the PipeWriter (in framed + /// mode, patches the [201][UINT16 size] header before Advance; in raw mode, simply + /// Advances the data bytes), then fires a background flush and acquires the next chunk. /// [MethodImpl(MethodImplOptions.NoInlining)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) @@ -174,13 +203,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase // growth when the consumer is slow. // The conditional FlushAsync at the end avoids double-flush if the previous flush // is still in progress (waitForFlush=false skip path). - if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) - SyncAwaitFlush(_lastFlush); + if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); - if (_lastFlush.IsCompleted) - _lastFlush = _pipeWriter.FlushAsync(); + if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync(); } // Acquire new chunk with header reservation (common to both paths). @@ -195,9 +222,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart); /// - /// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header. - /// Zero-copy: patches the reserved header bytes and calls Advance — no data copying. - /// Does NOT flush to network — the protocol writes [202] and flushes after. + /// Commits the last (partial) chunk to the PipeWriter — in framed mode patches the + /// [201][UINT16 size] header before Advance, in raw mode simply Advances the data. + /// Zero-copy: no data copying. Does NOT flush to network — in framed mode the protocol writes + /// [202] and flushes after. /// public void Flush(byte[] buffer, int position) { @@ -213,7 +241,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public void Reset() { } /// - /// Patches [201][UINT16 dataBytes] into the reserved header and commits via Advance. + /// Commits the current chunk to the PipeWriter. In framed mode, patches the reserved + /// [201][UINT16 dataBytes] header before Advance; in raw mode, simply Advances the data. /// For owned buffers, copies to PipeWriter first. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -222,16 +251,25 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase var dataBytes = position - _currentChunkStart; if (dataBytes <= 0) return; - var headerStart = _currentChunkStart - HeaderSize; - buffer[headerStart] = ChunkDataMarker; - BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); + if (_emitChunkFraming) + { + var headerStart = _currentChunkStart - HeaderSize; + buffer[headerStart] = ChunkDataMarker; - EmitDiagnostic($"CommitCurrentChunk: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); - if (_ownedBuffer) - FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + + if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + else _pipeWriter.Advance(HeaderSize + dataBytes); + } else - _pipeWriter.Advance(HeaderSize + dataBytes); + { + EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({dataBytes})"); + + if (_ownedBuffer) FlushOwnedBuffer(buffer, _currentChunkStart, dataBytes); + else _pipeWriter.Advance(dataBytes); + } _committedBytes += dataBytes; // only count data bytes, not framing } @@ -240,8 +278,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private void FlushOwnedBuffer(byte[] buffer, int start, int length) { var span = _pipeWriter.GetSpan(length); + buffer.AsSpan(start, length).CopyTo(span); _pipeWriter.Advance(length); + ArrayPool.Shared.Return(buffer); _ownedBuffer = false; } @@ -249,16 +289,20 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) { var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize); - var totalRequest = dataSize + HeaderSize; + + // Header reservation only in framed mode — raw mode skips it for byte-compat with the + // single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers). + var headerOffset = _emitChunkFraming ? HeaderSize : 0; + var totalRequest = dataSize + headerOffset; var memory = _pipeWriter.GetMemory(totalRequest); - EmitDiagnostic($"AcquireChunk: requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); + EmitDiagnostic($"AcquireChunk: framed={_emitChunkFraming} requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) { buffer = segment.Array; - position = segment.Offset + HeaderSize; - bufferEnd = segment.Offset + HeaderSize + dataSize; + position = segment.Offset + headerOffset; + bufferEnd = segment.Offset + headerOffset + dataSize; _ownedBuffer = false; EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); @@ -267,8 +311,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase { var owned = ArrayPool.Shared.Rent(totalRequest); buffer = owned; - position = HeaderSize; - bufferEnd = HeaderSize + dataSize; + position = headerOffset; + bufferEnd = headerOffset + dataSize; _ownedBuffer = true; EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]"); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index f204adc..11a76f3 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -139,21 +139,21 @@ public class AcBinaryHubProtocol : IHubProtocol _options = options.SerializerOptions; _options.BufferWriterChunkSize = options.BufferSize; + _protocolMode = options.ProtocolMode; _logger = options.Logger; _waitForFlush = options.WaitForFlush; _flushTimeout = options.FlushTimeout; + Name = options.Name; + _chunkStates = new ConditionalWeakTable(); - if (_logger != null) - { - _logger.LogInformation( - "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", - Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, - _waitForFlush, _flushTimeout.TotalMilliseconds, - _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); - } + _logger?.LogInformation( + "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", + Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, + _waitForFlush, _flushTimeout.TotalMilliseconds, + _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); } /// @@ -191,12 +191,9 @@ public class AcBinaryHubProtocol : IHubProtocol var task = vt.AsTask(); - if (!task.Wait(_flushTimeout)) - throw new TimeoutException( - $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + - "consumer may be too slow, stuck, or disconnected."); - - return task.GetAwaiter().GetResult(); + return task.Wait(_flushTimeout) + ? task.GetAwaiter().GetResult() + : throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected."); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -501,10 +498,12 @@ public class AcBinaryHubProtocol : IHubProtocol chunkStartPayload = bw.Position + externalBytes; bw.Flush(); + Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload); _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload); } + SyncFlush(pipeWriter.FlushAsync()); // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- @@ -518,6 +517,7 @@ public class AcBinaryHubProtocol : IHubProtocol var endByte = pipeWriter.GetSpan(1); endByte[0] = MsgAsyncChunkEnd; pipeWriter.Advance(1); + SyncFlush(pipeWriter.FlushAsync()); _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs index 0925fad..c20f428 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -37,7 +37,7 @@ public sealed class AcBinaryHubProtocolOptions /// /// Ignored for Bytes and Segment modes. /// - public bool WaitForFlush { get; set; } = true; + public bool WaitForFlush { get; set; } = false; /// /// Maximum wait for a single synchronous FlushAsync before throwing