diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index d07f894..104f648 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -13,7 +13,7 @@ namespace AyCode.Core.Tests.Serialization; /// The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers — /// the tests own the / /// lifecycle directly and call the generic -/// + +/// + /// primitives. This proves /// the streaming framework works on arbitrary PipeWriter/PipeReader sources /// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in @@ -29,6 +29,7 @@ public class AcBinarySerializerNamedPipeTests public async Task RoundTrip_SmallChunkSize_PayloadEquals() { var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; + // 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing // for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end. var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; @@ -51,6 +52,7 @@ public class AcBinarySerializerNamedPipeTests #if DEBUG // Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed. var diagLogs = new List(); + AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}"); AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}"); #endif @@ -71,6 +73,7 @@ public class AcBinarySerializerNamedPipeTests // Deep structure: count items + pallets + measurements + points must match exactly var origCounts = CountTestOrderHierarchy(original); var resultCounts = CountTestOrderHierarchy(result); + Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); @@ -81,15 +84,17 @@ public class AcBinarySerializerNamedPipeTests #if DEBUG AsyncPipeReaderInput.DiagnosticLog = null; AsyncPipeWriterOutput.DiagnosticLog = null; + if (diagLogs.Count > 0) { Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ==="); + // Print last 60 entries (most relevant to failure point) var startIdx = Math.Max(0, diagLogs.Count - 60); - if (startIdx > 0) - Console.WriteLine($" ... ({startIdx} earlier entries elided)"); - for (var i = startIdx; i < diagLogs.Count; i++) - Console.WriteLine($" [{i}] {diagLogs[i]}"); + if (startIdx > 0) Console.WriteLine($" ... ({startIdx} earlier entries elided)"); + + for (var i = startIdx; i < diagLogs.Count; i++) Console.WriteLine($" [{i}] {diagLogs[i]}"); + Console.WriteLine($"=== End DiagnosticLog ==="); } #endif @@ -98,7 +103,7 @@ public class AcBinarySerializerNamedPipeTests /// /// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic - /// on + /// on /// the client side and /// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter / /// PipeReader. @@ -107,7 +112,7 @@ public class AcBinarySerializerNamedPipeTests { // Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with // the OS), so the client can immediately attempt connect once we hand off to async. - await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); + await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Message, System.IO.Pipes.PipeOptions.Asynchronous); var receiveTask = Task.Run(async () => { @@ -123,9 +128,11 @@ public class AcBinarySerializerNamedPipeTests var pipeWriter = PipeWriter.Create(pipeClient); try { - // Public PipeWriter overload — auto-selects sequential flush strategy because - // PipeWriter.Create(stream) returns StreamPipeWriter (race-incompatible with parallel send). - AcBinarySerializer.Serialize(original, pipeWriter, opts); + // Public PipeWriter overload (raw chunked stream — no per-chunk frame headers, + // bit-compatible with Serialize(v, opts) byte[] output). Auto-selects sequential + // flush strategy because PipeWriter.Create(stream) returns StreamPipeWriter + // (race-incompatible with parallel send). + AcBinarySerializer.SerializeChunked(original, pipeWriter, opts); } finally { @@ -155,7 +162,7 @@ public class AcBinarySerializerNamedPipeTests // Note: a "default chunk size" test was deliberately omitted. The default // AcBinarySerializerOptions.BufferWriterChunkSize used to be 65536, which exceeded the - // UINT16 max (65535). Fixed in this work to 65535. Tests above explicitly set chunk size + // UINT16 max (256). Fixed in this work to 256. Tests above explicitly set chunk size // for reproducibility regardless of default. private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount) diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs index 2435998..4543bd5 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParallelTests.cs @@ -1,5 +1,6 @@ using AyCode.Core.Serializers.Binaries; using AyCode.Core.Tests.TestModels; +using System.IO; using System.IO.Pipelines; using static AyCode.Core.Tests.TestModels.AcSerializerModels; @@ -8,17 +9,18 @@ namespace AyCode.Core.Tests.Serialization; /// /// Unit tests for (Step 1, ACCORE-BIN-T-D6H4) and the /// extension (Step 2, ACCORE-BIN-T-M2K1), -/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9). +/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect +/// sanity pinning (Step 4). /// -/// The receiver-side is framing-aware: it -/// expects the AsyncSegment chunked wire format [201][UINT16 LE size][data] per chunk, -/// tolerates [200] CHUNK_START prefix, and signals end-of-stream on [202] -/// CHUNK_END. The helper wraps test data into single chunk -/// frames; multi-chunk tests concatenate multiple frames. +/// Tests run with 's default stripChunkFraming = true — +/// expects the AsyncSegment chunked wire format +/// [201][UINT16 LE size][data] per chunk, tolerates [200] CHUNK_START prefix, and +/// signals end-of-stream on [202] CHUNK_END. The helper +/// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames. /// -/// Wire format identical to output and to SignalR's -/// AcBinaryHubProtocol.TryParseChunkData input — unified across all transports per -/// ADR-0003 §9. +/// Wire format identical to framed output and to +/// SignalR's AcBinaryHubProtocol.TryParseChunkData input — unified across all transports +/// per ADR-0003 §9. /// [TestClass] public class AcBinarySerializerPipeParallelTests @@ -72,6 +74,7 @@ public class AcBinarySerializerPipeParallelTests public void Initialize_AfterFeed_ReturnsAvailableData() { using var input = new AsyncPipeReaderInput(64); + var data = new byte[] { 10, 20, 30 }; input.Feed(WrapInChunkFrame(data)); @@ -88,7 +91,8 @@ public class AcBinarySerializerPipeParallelTests public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse() { using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); + + input.Feed(WrapInChunkFrame([1, 2, 3])); input.Complete(); // Simulate consumer that has read all 3 bytes @@ -103,16 +107,18 @@ public class AcBinarySerializerPipeParallelTests public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder() { using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); - input.Feed(WrapInChunkFrame(new byte[] { 4, 5, 6 })); + + input.Feed(WrapInChunkFrame([1, 2, 3])); + input.Feed(WrapInChunkFrame([4, 5, 6])); input.Complete(); // Simulate consumer that has read 3 of 6 bytes — advance should expose the rest input.Initialize(out var buffer, out var position, out var bufferLength); Assert.AreEqual(6, bufferLength); - position = 3; + position = 3; var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); + Assert.IsTrue(hasMore); Assert.AreEqual(3, position); Assert.AreEqual(6, bufferLength); @@ -126,6 +132,7 @@ public class AcBinarySerializerPipeParallelTests { // Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow using var input = new AsyncPipeReaderInput(16); + var data = new byte[64]; for (var i = 0; i < data.Length; i++) data[i] = (byte)i; @@ -159,6 +166,7 @@ public class AcBinarySerializerPipeParallelTests while (offset < expected.Length) { var take = Math.Min(chunkSize, expected.Length - offset); + input.Feed(WrapInChunkFrame(expected, offset, take)); offset += take; } @@ -184,6 +192,7 @@ public class AcBinarySerializerPipeParallelTests using var input = new AsyncPipeReaderInput(32); var expected = new byte[totalBytes]; + for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF); var consumeTask = Task.Run(() => ConsumeAll(input)); @@ -210,6 +219,7 @@ public class AcBinarySerializerPipeParallelTests await Task.WhenAll(consumeTask, produceTask); var actual = consumeTask.Result; + Assert.AreEqual(expected.Length, actual.Length); CollectionAssert.AreEqual(expected, actual); } @@ -218,7 +228,7 @@ public class AcBinarySerializerPipeParallelTests public void Dispose_DoesNotThrow() { var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); + input.Feed(WrapInChunkFrame([1, 2, 3])); input.Complete(); input.Dispose(); @@ -237,12 +247,12 @@ public class AcBinarySerializerPipeParallelTests // Verifies the framing state machine survives partial frame headers / sizes / data // split across multiple Feed calls. using var input = new AsyncPipeReaderInput(64); + var data = new byte[] { 10, 20, 30, 40, 50 }; var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50] // Feed byte-by-byte to stress the state machine - for (var i = 0; i < frame.Length; i++) - input.Feed(frame.AsSpan(i, 1)); + for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1)); input.Complete(); var consumed = ConsumeAll(input); @@ -254,15 +264,17 @@ public class AcBinarySerializerPipeParallelTests { // [202] CHUNK_END alone (without external Complete()) should signal end-of-stream. using var input = new AsyncPipeReaderInput(64); - input.Feed(WrapInChunkFrame(new byte[] { 1, 2, 3 })); - input.Feed(new byte[] { 202 }); // CHUNK_END marker only — no external Complete() + + input.Feed(WrapInChunkFrame([1, 2, 3])); + input.Feed([202]); // CHUNK_END marker only — no external Complete() // Should observe completion: TryAdvanceSegment returns false on empty after consume input.Initialize(out var buffer, out var position, out var bufferLength); Assert.AreEqual(3, bufferLength); - position = bufferLength; + position = bufferLength; var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1); + Assert.IsFalse(hasMore); } @@ -272,7 +284,7 @@ public class AcBinarySerializerPipeParallelTests using var input = new AsyncPipeReaderInput(64); // Byte 0x42 is not 200/201/202 — should throw - _ = Assert.ThrowsExactly(() => input.Feed(new byte[] { 0x42 })); + _ = Assert.ThrowsExactly(() => input.Feed([0x42])); } // ==================================================================== @@ -285,8 +297,7 @@ public class AcBinarySerializerPipeParallelTests var pipe = new Pipe(); await pipe.Writer.CompleteAsync(); - await Assert.ThrowsExactlyAsync(async () => - await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader)); + await Assert.ThrowsExactlyAsync(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader)); } [TestMethod] @@ -294,8 +305,7 @@ public class AcBinarySerializerPipeParallelTests { using var input = new AsyncPipeReaderInput(64); - await Assert.ThrowsExactlyAsync(async () => - await input.DrainFromAsync(null!)); + await Assert.ThrowsExactlyAsync(async () => await input.DrainFromAsync(null!)); } [TestMethod] @@ -390,12 +400,12 @@ public class AcBinarySerializerPipeParallelTests // ==================================================================== // Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9) // - // True 3-task pipeline: AcBinarySerializer writes to pipe.Writer chunk-by-chunk via - // AsyncPipeWriterOutput (under the hood) — drainer pulls from pipe.Reader via - // DrainFromAsync — deserializer reads from AsyncPipeReaderInput. All three run - // concurrently with TRUE serialize↔deserialize overlap (the serializer is still writing - // the tail of the message while the deserializer has already consumed the head, courtesy - // of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput). + // True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via + // AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader + // via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed). + // All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is + // still writing the tail of the message while the deserializer has already consumed the + // head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput). // // BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk // boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state @@ -411,8 +421,7 @@ public class AcBinarySerializerPipeParallelTests var pipe = new Pipe(); using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); - var deserTask = Task.Run(() => - AcBinaryDeserializer.Deserialize(input, opts)); + var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, opts)); var drainTask = input.DrainFromAsync(pipe.Reader); @@ -420,9 +429,10 @@ public class AcBinarySerializerPipeParallelTests { try { - // PipeWriter overload — writes chunked AsyncSegment framing ([201][UINT16][data]). - // AsyncPipeReaderInput.Feed strips framing internally on the receive side. - AcBinarySerializer.Serialize(original, pipe.Writer, opts); + // SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire. + // AsyncPipeReaderInput.Feed strips framing internally on the receive side + // (default stripChunkFraming = true). + AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); } finally { @@ -456,7 +466,7 @@ public class AcBinarySerializerPipeParallelTests var drainTask = input.DrainFromAsync(pipe.Reader); var serTask = Task.Run(async () => { - try { AcBinarySerializer.Serialize(original, pipe.Writer, opts); } + try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); } finally { await pipe.Writer.CompleteAsync(); } }); @@ -471,6 +481,7 @@ public class AcBinarySerializerPipeParallelTests var origCounts = CountTestOrderHierarchy(original); var resultCounts = CountTestOrderHierarchy(result); + Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch"); Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch"); Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch"); @@ -479,7 +490,7 @@ public class AcBinarySerializerPipeParallelTests private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order) { - int items = order.Items.Count; + var items = order.Items.Count; int pallets = 0, measurements = 0, points = 0; foreach (var item in order.Items) { @@ -487,13 +498,84 @@ public class AcBinarySerializerPipeParallelTests foreach (var p in item.Pallets) { measurements += p.Measurements.Count; - foreach (var m in p.Measurements) - points += m.Points.Count; + points += p.Measurements.Sum(m => m.Points.Count); } } return (items, pallets, measurements, points); } + // ==================================================================== + // Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning + // ==================================================================== + // + // Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a + // different runtime type than new Pipe().Writer.GetType(). This is what makes + // AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential + // (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal + // BCL type names directly. If a future .NET unifies the two writer impls or renames the + // internal type in a way that breaks the detect, these tests fail before prod. + + [TestMethod] + public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes() + { + var pipeBased = new Pipe().Writer.GetType(); + var streamBased = PipeWriter.Create(Stream.Null).GetType(); + + // Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire + // would either always-true or always-false, both of which break correctness. + Assert.AreNotEqual(pipeBased, streamBased, + $"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " + + $"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}"); + + // Living documentation — typenames printed for debugging on future .NET upgrades. + Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}"); + Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}"); + } + + [TestMethod] + public void StreamPipeWriterTypeField_MatchesFactoryResult() + { + // The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType() + // at class-load time. A second call to the factory MUST yield the same Type instance — + // otherwise the cache is stale and the runtime detect mis-classifies all stream writers. + var freshType = PipeWriter.Create(Stream.Null).GetType(); + + Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType, + "Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " + + "behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput)."); + } + + [TestMethod] + public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse() + { + // The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else + // sequential mode would be wrongly selected and we'd lose the parallelism feature. + var pipeBasedType = new Pipe().Writer.GetType(); + + Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType), + $"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " + + $"(or subclass) — runtime detect would mis-classify it as sequential."); + } + + [TestMethod] + public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue() + { + // PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) — + // even for unusual stream types (file, memory, null). + Type[] writerTypes = + [ + PipeWriter.Create(Stream.Null).GetType(), + PipeWriter.Create(new MemoryStream()).GetType(), + ]; + + foreach (var t in writerTypes) + { + Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t), + $"PipeWriter.Create() returned typename={t.FullName} which is not " + + $"assignable to StreamPipeWriterType — the BCL changed its factory contract."); + } + } + // ==================================================================== // Test helpers // ==================================================================== @@ -502,15 +584,16 @@ public class AcBinarySerializerPipeParallelTests /// Wraps a raw payload in a single AsyncSegment chunk frame: [201][UINT16 LE size][data]. /// Matches the wire format produced by per chunk. /// - private static byte[] WrapInChunkFrame(byte[] data) - => WrapInChunkFrame(data, 0, data.Length); + private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length); private static byte[] WrapInChunkFrame(byte[] data, int offset, int length) { var result = new byte[3 + length]; + result[0] = 201; // CHUNK_DATA marker result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte + Array.Copy(data, offset, result, 3, length); return result; } diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index ecce44e..fe9f8fb 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -48,8 +48,7 @@ public static partial class AcBinaryDeserializer internal static void Register(Type type, IGeneratedBinaryReader reader) => Readers[type] = reader; [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static IGeneratedBinaryReader? TryGet(Type type) => - Readers.TryGetValue(type, out var reader) ? reader : null; + internal static IGeneratedBinaryReader? TryGet(Type type) => Readers.GetValueOrDefault(type); } /// @@ -119,18 +118,15 @@ public static partial class AcBinaryDeserializer readers[BinaryTypeCode.ByteArray] = static (ctx, _, _) => ReadByteArray(ctx); // Register FixStr readers - for (byte code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++) + for (var code = BinaryTypeCode.FixStrBase; code <= BinaryTypeCode.FixStrMax; code++) { var length = BinaryTypeCode.DecodeFixStrLength(code); readers[code] = CreateFixStrReader(length); } // Register FixObj slot readers (0..SlotCount-1) - for (int slot = 0; slot < BinaryTypeCode.SlotCount; slot++) - { - readers[slot] = CreateFixObjReader(slot); - } - + for (var slot = 0; slot < BinaryTypeCode.SlotCount; slot++) readers[slot] = CreateFixObjReader(slot); + return readers; } } @@ -140,11 +136,9 @@ public static partial class AcBinaryDeserializer /// Creates a reader for FixStr with the given length. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static TypeReader CreateFixStrReader(int length) - where TInput : struct, IBinaryInputBase + private static TypeReader CreateFixStrReader(int length) where TInput : struct, IBinaryInputBase { - if (length == 0) - return static (_, _, _) => string.Empty; + if (length == 0) return static (_, _, _) => string.Empty; return (ctx, _, _) => ctx.ReadStringUtf8(length); } @@ -153,13 +147,12 @@ public static partial class AcBinaryDeserializer /// Creates a reader for FixObj slot (0..SlotCount-1). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static TypeReader CreateFixObjReader(int slot) - where TInput : struct, IBinaryInputBase + private static TypeReader CreateFixObjReader(int slot) where TInput : struct, IBinaryInputBase { return (ctx, targetType, depth) => ReadObjectFromSlot(ctx, slot, targetType, depth); } - private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true); + //private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true); #region Public API @@ -184,6 +177,7 @@ public static partial class AcBinaryDeserializer var context = DeserializationContextPool.Get(options); context.InitInput(new ArrayBinaryInput(data)); + try { return (T?)DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -193,8 +187,7 @@ public static partial class AcBinaryDeserializer /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static T? Deserialize(byte[] data, int offset, int length) - => Deserialize(data, offset, length, AcBinarySerializerOptions.Default); + public static T? Deserialize(byte[] data, int offset, int length) => Deserialize(data, offset, length, AcBinarySerializerOptions.Default); /// /// Deserialize binary data to object of type T from a sub-range with options. @@ -206,11 +199,11 @@ public static partial class AcBinaryDeserializer if (length == 1 && data[offset] == BinaryTypeCode.Null) return default; var targetType = typeof(T); - if (AcSerializerCommon.IsExpressionType(targetType)) - return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options); + if (AcSerializerCommon.IsExpressionType(targetType)) return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options); var context = DeserializationContextPool.Get(options); context.InitInput(new ArrayBinaryInput(data, offset, length)); + try { return (T?)DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -218,14 +211,12 @@ public static partial class AcBinaryDeserializer /// /// Deserialize binary data to specified type. /// - public static object? Deserialize(byte[] data, Type targetType) - => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default); + public static object? Deserialize(byte[] data, Type targetType) => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default); /// /// Deserialize binary data to specified type with options. /// - public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) - => Deserialize(data, 0, data.Length, targetType, options); + public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) => Deserialize(data, 0, data.Length, targetType, options); /// /// Deserialize binary data to specified type from a sub-range. @@ -332,16 +323,21 @@ public static partial class AcBinaryDeserializer /// Transport-agnostic: works with any PipeReader source — NamedPipe IPC /// (PipeReader.Create(namedPipeServerStream)), file-stream /// (PipeReader.Create(fileStream)), TCP (PipeReader.Create(networkStream)), - /// or custom PipeReader implementations. Strips the [201][UINT16 size][data] - /// chunked framing internally via . + /// or custom PipeReader implementations. Reads raw AcBinary bytes verbatim from + /// the pipe — no wire-format unwrapping. Pair with the producer-side + /// + /// (or its overload), which writes the same raw byte + /// stream as 's + /// byte[] output. /// /// Receive buffer initial capacity is derived from options.BufferWriterChunkSize × 2 /// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the /// message's lifetime regardless of total payload size. /// - /// For the producer side: see - /// - /// or . + /// For the multiplexed wire format (per-chunk [201][UINT16][data] headers, + /// produced by SerializeChunkedFramed or SignalR's AsyncSegment mode): the parser + /// strips framing on its own (e.g. AcBinaryHubProtocol.TryParseChunkData) and feeds + /// only the data bytes here. /// /// Source pipe reader. Caller owns lifecycle (creation + completion). /// Serializer options. Defaults to . @@ -354,7 +350,11 @@ public static partial class AcBinaryDeserializer var opts = options ?? AcBinarySerializerOptions.Default; - using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); + // Raw mode (stripChunkFraming: false) — bytes drained from the PipeReader are forwarded + // verbatim to the deserialization buffer. Pair with AcBinarySerializer.SerializeChunked + // (raw byte stream) on the producer side; for chunked-framed wire formats the parser + // strips framing upstream and feeds only data bytes here. + using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2, stripChunkFraming: false); var deserTask = Task.Run(() => Deserialize(input, opts), ct); await input.DrainFromAsync(reader, ct).ConfigureAwait(false); @@ -1654,7 +1654,7 @@ public static partial class AcBinaryDeserializer if (targetType.IsArray) { var array = Array.CreateInstance(elementType, count); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var value = ReadValue(context, elementType, nextDepth); array.SetValue(value, i); @@ -1681,7 +1681,7 @@ public static partial class AcBinaryDeserializer try { - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var value = ReadValue(context, elementType, nextDepth); list.Add(value); @@ -1706,7 +1706,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, IntType)) { var array = new int[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (BinaryTypeCode.IsTinyInt(typeCode)) @@ -1724,7 +1724,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DoubleType)) { var array = new double[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Float64) return null; @@ -1738,7 +1738,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, LongType)) { var array = new long[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (BinaryTypeCode.IsTinyInt(typeCode)) @@ -1758,7 +1758,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, BoolType)) { var array = new bool[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode == BinaryTypeCode.True) array[i] = true; @@ -1773,7 +1773,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, GuidType)) { var array = new Guid[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Guid) return null; @@ -1787,7 +1787,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DecimalType)) { var array = new decimal[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Decimal) return null; @@ -1801,7 +1801,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DateTimeType)) { var array = new DateTime[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.DateTime) return null; @@ -1815,7 +1815,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, FloatType)) { var array = new float[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Float32) return null; @@ -1829,7 +1829,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, ShortType)) { var array = new short[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Int16) return null; @@ -1843,7 +1843,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, UShortType)) { var array = new ushort[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt16) return null; @@ -1857,7 +1857,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, UIntType)) { var array = new uint[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt32) return null; @@ -1871,7 +1871,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, ULongType)) { var array = new ulong[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.UInt64) return null; @@ -1885,7 +1885,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, SByteType)) { var array = new sbyte[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Int8) return null; @@ -1899,7 +1899,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, CharType)) { var array = new char[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.Char) return null; @@ -1913,7 +1913,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, DateTimeOffsetType)) { var array = new DateTimeOffset[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.DateTimeOffset) return null; @@ -1927,7 +1927,7 @@ public static partial class AcBinaryDeserializer if (ReferenceEquals(elementType, TimeSpanType)) { var array = new TimeSpan[count]; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var typeCode = context.ReadByte(); if (typeCode != BinaryTypeCode.TimeSpan) return null; @@ -1964,7 +1964,7 @@ public static partial class AcBinaryDeserializer var dict = (IDictionary)Activator.CreateInstance(dictType, count)!; var nextDepth = depth + 1; - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { var key = ReadValue(context, keyType, nextDepth); var value = ReadValue(context, valueType, nextDepth); @@ -2196,7 +2196,7 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { var count = (int)context.ReadVarUInt(); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { SkipValue(context, metaData); } @@ -2206,7 +2206,7 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { var count = (int)context.ReadVarUInt(); - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) { SkipValue(context, metaData); // key SkipValue(context, metaData); // value diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 1a95d48..f635f95 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -423,21 +423,17 @@ public static partial class AcBinarySerializer } /// - /// Serialize to a with chunked protocol framing via - /// — gives the caller full - /// + control because - /// is always the BCL PipeWriterImpl, which is parallel-capable (no _tailMemory - /// reset race like StreamPipeWriter). + /// Serialize to a as a chunked stream — pure AcBinary + /// bytes are written via in raw mode (no per-chunk header). + /// The output is byte-compatible with 's + /// byte[] result; a consumer can drain pipe.Reader and feed the bytes directly to + /// (or pipe-them through DeserializeFromPipeReaderAsync) + /// with no extra parser. /// - /// Each chunk (including the last) is framed as [201][UINT16 size][data] and - /// committed to pipe.Writer via Advance (zero-copy). A consumer drains - /// pipe.Reader on a background task and writes to the actual transport. - /// - /// Use this overload when you constructed new Pipe() yourself and need - /// runtime tuning of the flush strategy. For arbitrary - /// (Kestrel transport output, PipeWriter.Create(stream), custom writers), use the - /// - /// overload. + /// Why instead of ? + /// Pipe.Writer is always the BCL PipeWriterImpl, which is parallel-capable + /// (no _tailMemory reset race like StreamPipeWriter). This overload exposes the + /// + tuning safely. /// /// The value to serialize; null writes a single null marker. /// Target pipe — caller drains pipe.Reader elsewhere. @@ -452,49 +448,108 @@ public static partial class AcBinarySerializer /// Per-flush timeout. null → wait forever. Positive value: throws /// on stuck consumers. /// - /// Total serialized data bytes (excluding framing overhead). - public static int Serialize(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + /// Total serialized bytes written. + public static int SerializeChunked(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (pipe is null) throw new ArgumentNullException(nameof(pipe)); - return Serialize(value, pipe.Writer, options, waitForFlush, flushTimeout); + return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: false); } /// - /// Serialize to any with chunked protocol framing - /// via . Each chunk (including the last) is framed as - /// [201][UINT16 size][data] and committed to the PipeWriter via Advance (zero-copy). + /// Serialize to any as a chunked stream — pure + /// AcBinary bytes, no per-chunk header. The output is byte-compatible with + /// 's byte[] result. /// - /// Flush strategy is auto-selected by writer type: - /// StreamPipeWriter (from PipeWriter.Create(stream) — NamedPipe, FileStream, - /// NetworkStream, etc.) runs sequentially per chunk because the BCL impl resets - /// _tailMemory on flush completion (race-incompatible with parallel send). All other - /// PipeWriter implementations (Kestrel transport, custom impls) run with the safe - /// waitForFlush=true default — max parallelism, zero-alloc. + /// Flush strategy auto-selected by writer type: StreamPipeWriter + /// (PipeWriter.Create(stream) — NamedPipe / FileStream / NetworkStream / etc.) runs + /// sequentially per chunk because the BCL impl resets _tailMemory on flush completion + /// (race-incompatible with parallel send). Other PipeWriter implementations (Kestrel transport, + /// custom impls) run with the safe waitForFlush=true default — max parallelism, zero-alloc. /// - /// Need runtime tuning of the flush strategy? If you control the pipe yourself, - /// build a instance and use the - /// - /// overload — only Pipe-based writers can guarantee parallel-capable flush behavior. + /// Need runtime tuning of the flush strategy? Build a + /// instance and use + /// + /// — only Pipe-based writers can guarantee parallel-capable flush behavior. + /// + /// Need a multiplexed wire format with per-chunk frame headers? See + /// . /// /// The value to serialize; null writes a single null marker. /// Target pipe writer. /// Serializer options (type wrappers, reference handling, interning, etc.). - /// Total serialized data bytes (excluding framing overhead). - public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) - => Serialize(value, pipeWriter, options, waitForFlush: true, flushTimeout: null); + /// Total serialized bytes written. + public static int SerializeChunked(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: false); /// - /// Internal flush-tunable PipeWriter overload — only callable from AyCode.Services - /// (SignalR hub protocol) because external callers cannot safely choose - /// without knowing the concrete implementation. - /// SignalR uses Kestrel transport output, which is parallel-capable, and forwards the - /// hub-protocol-options-configured tuning here. + /// Serialize a value into a chunked stream where each chunk carries a self-describing + /// frame header — [201][UINT16 size][data] per chunk, with a final [202] + /// end-of-stream marker. The frame headers let the receiver detect chunk boundaries + /// incrementally without knowing the total payload size up front, and let multiple + /// independent messages share a single transport with reliable separation. /// - /// For the public API, see the overload (parallel-capable, - /// tuning paramters available) or the simple overload - /// (auto-selects strategy, no tuning). + /// Use this when building a multiplexed wire protocol where several logical + /// messages are interleaved on one stream, when the receiver needs to start deserializing + /// as bytes arrive (pipeline parallelism — serialize / network / deserialize overlap), or + /// when the upper layer needs to dispatch each chunk independently. Typical scenarios: + /// real-time RPC, custom Hub-style protocols, event stream multiplexing. + /// + /// Concrete example: SignalR's BinaryProtocolMode.AsyncSegment uses + /// this exact wire format to interleave many HubMessages over a single connection. + /// + /// Need a simpler streaming output without per-chunk metadata? Use + /// + /// — bit-compatible with 's + /// byte[] output, no extra parser needed on the receive side. + /// + /// The value to serialize; null writes a single null marker. + /// Target pipe — caller drains pipe.Reader elsewhere. + /// Serializer options. + /// See . + /// See . + /// Total serialized data bytes (excluding framing overhead). + public static int SerializeChunkedFramed(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + { + if (pipe is null) throw new ArgumentNullException(nameof(pipe)); + return SerializeToPipeWriterCore(value, pipe.Writer, options, waitForFlush, flushTimeout, emitChunkFraming: true); + } + + /// + /// Serialize to any with per-chunk frame headers + /// (multiplexed wire format). See + /// + /// for the wire format details and use-cases. + /// + /// Flush strategy auto-selected by writer type — see + /// . + /// + public static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush: true, flushTimeout: null, emitChunkFraming: true); + + /// + /// Internal flush-tunable framed PipeWriter overload — used by AyCode.Services + /// (SignalR hub protocol) on Kestrel transport output, which is parallel-capable. External + /// callers should use the overload to safely tune + /// on a guaranteed parallel-capable writer. + /// + internal static int SerializeChunkedFramed(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true); + + /// + /// Internal legacy alias for + /// — kept until the SignalR hub protocol (AcBinaryHubProtocol.cs) is migrated to the + /// new name in a separate, isolated step. Identical behavior to SerializeChunkedFramed + /// (framed wire format with [201][UINT16][data] per chunk + [202] end marker). /// internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) + => SerializeToPipeWriterCore(value, pipeWriter, options, waitForFlush, flushTimeout, emitChunkFraming: true); + + /// + /// Common pipe-output serialization core. Same loop for both raw () + /// and framed () modes — the only difference flows through + /// into the ctor. + /// + private static int SerializeToPipeWriterCore(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout, bool emitChunkFraming) { if (value == null) { @@ -507,7 +562,8 @@ public static partial class AcBinarySerializer var runtimeType = value.GetType(); var context = BinarySerializationContextPool.Get(options); - context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush, flushTimeout); + + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, emitChunkFraming, waitForFlush, flushTimeout); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs index 683ae57..7245688 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs @@ -1,9 +1,6 @@ -using System; using System.Buffers; using System.Diagnostics; -using System.IO; using System.Runtime.CompilerServices; -using System.Threading; namespace AyCode.Core.Serializers.Binaries; @@ -18,13 +15,19 @@ namespace AyCode.Core.Serializers.Binaries; /// .NET BCL convention for type-level Async prefix (AsyncEnumerable, /// IAsyncDisposable, AsyncLocal<T>, ...). /// +/// behavior is driven by the stripChunkFraming ctor flag: +/// true (default) — parses [201][UINT16][data] chunked frames + [202] end +/// marker (matches AsyncPipeWriterOutput framed output and SignalR's AsyncSegment wire +/// format); false — appends bytes verbatim (matches AcBinarySerializer.SerializeChunked +/// raw output drained from a ). +/// /// Usage modes: /// /// Push (Feed-API): producer thread calls with chunk bytes /// (typical for SignalR TryParseChunkData). /// Pull (DrainFromAsync extension): helper drains a /// into the input via repeated -/// calls (typical for NamedPipe / FileStream). +/// calls (typical for NamedPipe / FileStream / NetworkStream). /// /// /// Backed by a single contiguous byte[] from . Positions reset @@ -57,16 +60,21 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable private int _readPos; // consumer reports consumed position here private bool _completed; + // Whether Feed() should strip [201][UINT16][data] chunked framing (true, default — matches + // SignalR-style multiplexed wire) or append bytes verbatim (false — matches the raw output + // of SerializeChunked / single-shot byte[] over PipeWriter). + private readonly bool _stripChunkFraming; + // Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END. - // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's output; - // identical parsing logic to AcBinaryHubProtocol.TryParseChunkData but stream-stateful - // across Feed(span) boundaries (the SignalR side has ReadOnlySequence with rewind; - // we get arbitrary spans). + // [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's framed + // output and SignalR's AsyncSegment chunked frame format. Only active when + // _stripChunkFraming = true. private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data] private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-stream private FramingState _framingState = FramingState.AwaitingHeader; + private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame @@ -108,36 +116,54 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable /// 4 KB chunk size, 128 KB for the standalone 64 KB default). /// /// Initial buffer size. Rounded up by ArrayPool. - /// Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds. - public AsyncPipeReaderInput(int initialCapacity) + /// + /// true (default): parses [201][UINT16][data] chunked frames + + /// [202] end marker (matches framed output and + /// SignalR's AsyncSegment chunked wire format). + /// false: appends bytes verbatim — for raw byte streams (matches + /// AcBinarySerializer.SerializeChunked output and the single-shot + /// byte[] output). + /// + public AsyncPipeReaderInput(int initialCapacity, bool stripChunkFraming = true) { if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity)); _buffer = ArrayPool.Shared.Rent(initialCapacity); + _stripChunkFraming = stripChunkFraming; _dataAvailable = new ManualResetEventSlim(false); } // --- Producer API (push) --- /// - /// Feeds raw chunked-wire bytes (any combination of complete or partial - /// [201][UINT16 LE size][data] frames, optional [200] CHUNK_START prefix, - /// trailing [202] CHUNK_END). Strips framing internally; only the unwrapped - /// data bytes land in the consumer-visible buffer. - /// - /// State is preserved across Feed calls — partial frame headers, mid-size - /// boundaries, and mid-data boundaries all resume correctly on the next call. - /// - /// Wire format identical to output and to - /// SignalR's AsyncSegment chunked frame format. Unified across all transports per ADR-0003. - /// - /// On [202], sets the completion flag and signals waiting consumers — equivalent - /// to an external call. Bytes after [202] are ignored. + /// Feeds bytes into the consumer-visible buffer. Behavior is driven by the + /// stripChunkFraming ctor flag: + /// + /// stripChunkFraming = true (default): expects the chunked wire format + /// [201][UINT16 LE size][data] per chunk, tolerates [200] + /// CHUNK_START prefix, and signals end-of-stream on [202] CHUNK_END. State + /// is preserved across Feed calls — partial frame headers, mid-size boundaries, + /// and mid-data boundaries all resume correctly. On [202], sets the completion + /// flag and signals waiting consumers — equivalent to an external + /// call. Bytes after [202] are ignored. + /// stripChunkFraming = false: appends bytes verbatim — no wire-format + /// interpretation. The producer must pass only payload bytes (e.g. raw byte stream + /// drained from a paired with + /// AcBinarySerializer.SerializeChunked). + /// /// public void Feed(ReadOnlySpan data) { if (data.IsEmpty) return; + if (!_stripChunkFraming) + { + // Raw mode: append verbatim, no framing interpretation. + AppendToBuffer(data); + return; + } + + // Framed mode: state machine parses [201][UINT16 LE size][data] frames + [202] end marker. var i = 0; while (i < data.Length) { @@ -214,7 +240,7 @@ public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable } /// - /// Appends unwrapped data bytes to the internal buffer with sliding-window cycling + /// Appends data bytes to the internal buffer with sliding-window cycling /// (reset to 0 when consumer has caught up) and grow-as-last-resort. Signals the consumer. /// private void AppendToBuffer(ReadOnlySpan data) diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 685ff94..f29d66f 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -1,26 +1,34 @@ -using System; using System.Buffers; using System.Buffers.Binary; using System.Diagnostics; -using System.IO; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -using System.Threading.Tasks; namespace AyCode.Core.Serializers.Binaries; /// -/// Binary output that writes to a PipeWriter with per-chunk self-describing framing. +/// Binary output that writes to a PipeWriter chunk-by-chunk via the PipeWriter's natural slabbing. +/// Two wire-format modes are supported, selected by the emitChunkFraming ctor flag: /// -/// Each chunk (including the last) is framed as [201][UINT16 size][data] with a 3-byte header -/// reserved at the start of each buffer. The serializer context writes into the space after the -/// reserved bytes; on , the header is patched and the full chunk is committed via -/// Advance (zero-copy). does the same for the last (partial) chunk. +/// +/// emitChunkFraming = false (raw): pure AcBinary bytes are written into +/// the PipeWriter's slabs and committed via Advance — no per-chunk header bytes appear on the +/// wire. Bit-compatible with the single-shot Serialize(value, opts) → byte[] output. +/// The receiver can deserialize the byte stream as-is (e.g. via +/// AcBinaryDeserializer.Deserialize(byte[]) after collecting, or any raw +/// PipeReader-based path). /// -/// The protocol layer writes a single [202] byte after all chunks to signal end-of-stream. +/// emitChunkFraming = true (framed): each chunk gets a 3-byte header +/// [201][UINT16 size][data]. The header is reserved at the start of each acquired slab; +/// the serializer writes data after it, and on commit the size is patched and the full chunk +/// is Advanced (zero-copy). The protocol layer writes a single [202] byte after all +/// chunks to signal end-of-stream. This is the multiplexed wire format used by SignalR's +/// BinaryProtocolMode.AsyncSegment and any custom multiplexed protocol where the +/// receiver needs incremental chunk-boundary detection. +/// /// -/// Backpressure modes (controlled by waitForFlush): +/// Backpressure modes (controlled by waitForFlush) — independent of framing: /// /// waitForFlush=true (default): Grow() waits for the previous FlushAsync before /// starting a new chunk. Pro: maximum pipeline parallelism, guaranteed end-to-end zero-copy. @@ -32,12 +40,18 @@ namespace AyCode.Core.Serializers.Binaries; /// for that chunk. /// /// +/// Flush strategy auto-selects on writer type — Stream-backed PipeWriters +/// (PipeWriter.Create(Stream)) run sequentially per chunk because of the +/// StreamPipeWriter._tailMemory reset race; Pipe-based and Kestrel transport writers +/// keep parallelism. Orthogonal to the framing flag: all four (framed/raw) × (sequential/parallel) +/// combinations work. +/// /// Timeout safety: every synchronous flush-await is bounded by flushTimeout /// (default when the type is used directly; /// passes 10 s from its options). A /// propagates to the caller, allowing the connection to abort instead of blocking forever. /// -/// Maximum chunk data size: 65535 bytes (UINT16 max). +/// Maximum chunk data size (in framed mode): 65535 bytes (UINT16 max). /// public struct AsyncPipeWriterOutput : IBinaryOutputBase { @@ -51,16 +65,21 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public const int MaxChunkSize = ushort.MaxValue; /// - /// Cached runtime type, discovered via the public + /// Cached runtime type, discovered via the public /// factory at class-load /// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the /// internal type, this auto-tracks). The dummy instance is unreachable after class init /// and GC-collected; the static field retains only the reference. + /// + /// internal visibility — exposed for sanity-check tests in + /// AyCode.Core.Tests (verifies new Pipe().Writer.GetType() != StreamPipeWriterType + /// and that the runtime detect can never accidentally fire on Pipe-based writers). /// - private static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); + internal static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; + private readonly bool _emitChunkFraming; private readonly bool _waitForFlush; private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; @@ -80,31 +99,39 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase [Conditional("DEBUG")] private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message); - /// Creates an output bound to the given PipeWriter with self-describing chunked framing. - /// Target pipe (typically Kestrel's transport output for SignalR). + /// Creates an output bound to the given PipeWriter — framed or raw mode per . + /// Target pipe (typically Kestrel's transport output for SignalR, NamedPipe, FileStream, or any custom ). /// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size. + /// true → write [201][UINT16][data] per-chunk header (multiplexed wire format). + /// false → raw AcBinary bytes only, byte-compatible with the single-shot byte[] output. See class summary. /// See class summary — pipeline parallelism (true) vs adaptive (false). /// Per-flush timeout. null /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers. - public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool emitChunkFraming = true, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (chunkSize > MaxChunkSize) - throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, - $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); + throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); _pipeWriter = pipeWriter; _chunkSize = chunkSize; + _emitChunkFraming = emitChunkFraming; _waitForFlush = waitForFlush; + // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. - _flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan; + _flushTimeout = flushTimeout ?? Timeout.InfiniteTimeSpan; + // StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default // at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this // class deliberately uses. For Stream-backed writers, fully await the just-started flush // before acquiring the next chunk's memory (the writer-correct usage pattern; flush is // a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT // reset state on flush completion → the parallelism feature stays intact for them. - _serializeFlushAndAcquire = pipeWriter.GetType() == StreamPipeWriterType; + // + // IsAssignableFrom (not ==) so any future BCL subclass of StreamPipeWriter automatically + // picks the safe sequential path — forward-compat, no silent breakage on .NET upgrades. + _serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter); + _committedBytes = 0; _ownedBuffer = false; _lastFlush = default; @@ -125,29 +152,31 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase var task = vt.AsTask(); if (!task.Wait(_flushTimeout)) - throw new TimeoutException( - $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + - "consumer may be too slow, stuck, or disconnected."); + throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected."); // Completed within timeout — propagate any faulted exception task.GetAwaiter().GetResult(); } /// - /// Provides the initial buffer from the PipeWriter with 3-byte header reservation. + /// Provides the initial buffer from the PipeWriter — in framed mode, the buffer's first 3 + /// bytes are reserved for the chunk header (filled at ); + /// in raw mode, no reservation, the data starts at position = 0. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; _lastFlush = default; + AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); _currentChunkStart = position; } /// - /// Called when the context's buffer is full. Patches the chunk header [201][UINT16 size], - /// commits the chunk to the PipeWriter, and fires a background flush. + /// Called when the context's buffer is full. Commits the chunk to the PipeWriter (in framed + /// mode, patches the [201][UINT16 size] header before Advance; in raw mode, simply + /// Advances the data bytes), then fires a background flush and acquires the next chunk. /// [MethodImpl(MethodImplOptions.NoInlining)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) @@ -174,13 +203,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase // growth when the consumer is slow. // The conditional FlushAsync at the end avoids double-flush if the previous flush // is still in progress (waitForFlush=false skip path). - if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) - SyncAwaitFlush(_lastFlush); + if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); - if (_lastFlush.IsCompleted) - _lastFlush = _pipeWriter.FlushAsync(); + if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync(); } // Acquire new chunk with header reservation (common to both paths). @@ -195,9 +222,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart); /// - /// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header. - /// Zero-copy: patches the reserved header bytes and calls Advance — no data copying. - /// Does NOT flush to network — the protocol writes [202] and flushes after. + /// Commits the last (partial) chunk to the PipeWriter — in framed mode patches the + /// [201][UINT16 size] header before Advance, in raw mode simply Advances the data. + /// Zero-copy: no data copying. Does NOT flush to network — in framed mode the protocol writes + /// [202] and flushes after. /// public void Flush(byte[] buffer, int position) { @@ -213,7 +241,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public void Reset() { } /// - /// Patches [201][UINT16 dataBytes] into the reserved header and commits via Advance. + /// Commits the current chunk to the PipeWriter. In framed mode, patches the reserved + /// [201][UINT16 dataBytes] header before Advance; in raw mode, simply Advances the data. /// For owned buffers, copies to PipeWriter first. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -222,16 +251,25 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase var dataBytes = position - _currentChunkStart; if (dataBytes <= 0) return; - var headerStart = _currentChunkStart - HeaderSize; - buffer[headerStart] = ChunkDataMarker; - BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); + if (_emitChunkFraming) + { + var headerStart = _currentChunkStart - HeaderSize; + buffer[headerStart] = ChunkDataMarker; - EmitDiagnostic($"CommitCurrentChunk: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); - if (_ownedBuffer) - FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + + if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + else _pipeWriter.Advance(HeaderSize + dataBytes); + } else - _pipeWriter.Advance(HeaderSize + dataBytes); + { + EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({dataBytes})"); + + if (_ownedBuffer) FlushOwnedBuffer(buffer, _currentChunkStart, dataBytes); + else _pipeWriter.Advance(dataBytes); + } _committedBytes += dataBytes; // only count data bytes, not framing } @@ -240,8 +278,10 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private void FlushOwnedBuffer(byte[] buffer, int start, int length) { var span = _pipeWriter.GetSpan(length); + buffer.AsSpan(start, length).CopyTo(span); _pipeWriter.Advance(length); + ArrayPool.Shared.Return(buffer); _ownedBuffer = false; } @@ -249,16 +289,20 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) { var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize); - var totalRequest = dataSize + HeaderSize; + + // Header reservation only in framed mode — raw mode skips it for byte-compat with the + // single-shot byte[] output (each chunk holds pure AcBinary bytes, no markers). + var headerOffset = _emitChunkFraming ? HeaderSize : 0; + var totalRequest = dataSize + headerOffset; var memory = _pipeWriter.GetMemory(totalRequest); - EmitDiagnostic($"AcquireChunk: requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); + EmitDiagnostic($"AcquireChunk: framed={_emitChunkFraming} requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) { buffer = segment.Array; - position = segment.Offset + HeaderSize; - bufferEnd = segment.Offset + HeaderSize + dataSize; + position = segment.Offset + headerOffset; + bufferEnd = segment.Offset + headerOffset + dataSize; _ownedBuffer = false; EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); @@ -267,8 +311,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase { var owned = ArrayPool.Shared.Rent(totalRequest); buffer = owned; - position = HeaderSize; - bufferEnd = HeaderSize + dataSize; + position = headerOffset; + bufferEnd = headerOffset + dataSize; _ownedBuffer = true; EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]"); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index f204adc..11a76f3 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -139,21 +139,21 @@ public class AcBinaryHubProtocol : IHubProtocol _options = options.SerializerOptions; _options.BufferWriterChunkSize = options.BufferSize; + _protocolMode = options.ProtocolMode; _logger = options.Logger; _waitForFlush = options.WaitForFlush; _flushTimeout = options.FlushTimeout; + Name = options.Name; + _chunkStates = new ConditionalWeakTable(); - if (_logger != null) - { - _logger.LogInformation( - "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", - Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, - _waitForFlush, _flushTimeout.TotalMilliseconds, - _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); - } + _logger?.LogInformation( + "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", + Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, + _waitForFlush, _flushTimeout.TotalMilliseconds, + _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); } /// @@ -191,12 +191,9 @@ public class AcBinaryHubProtocol : IHubProtocol var task = vt.AsTask(); - if (!task.Wait(_flushTimeout)) - throw new TimeoutException( - $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + - "consumer may be too slow, stuck, or disconnected."); - - return task.GetAwaiter().GetResult(); + return task.Wait(_flushTimeout) + ? task.GetAwaiter().GetResult() + : throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected."); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -501,10 +498,12 @@ public class AcBinaryHubProtocol : IHubProtocol chunkStartPayload = bw.Position + externalBytes; bw.Flush(); + Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload); _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload); } + SyncFlush(pipeWriter.FlushAsync()); // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- @@ -518,6 +517,7 @@ public class AcBinaryHubProtocol : IHubProtocol var endByte = pipeWriter.GetSpan(1); endByte[0] = MsgAsyncChunkEnd; pipeWriter.Advance(1); + SyncFlush(pipeWriter.FlushAsync()); _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs index 0925fad..c20f428 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -37,7 +37,7 @@ public sealed class AcBinaryHubProtocolOptions /// /// Ignored for Bytes and Segment modes. /// - public bool WaitForFlush { get; set; } = true; + public bool WaitForFlush { get; set; } = false; /// /// Maximum wait for a single synchronous FlushAsync before throwing