diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index 774e0ff..d07f894 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -1,3 +1,5 @@ +using System.IO.Pipelines; +using System.IO.Pipes; using AyCode.Core.Serializers.Binaries; using AyCode.Core.Tests.TestModels; using static AyCode.Core.Tests.TestModels.AcSerializerModels; @@ -5,15 +7,20 @@ using static AyCode.Core.Tests.TestModels.AcSerializerModels; namespace AyCode.Core.Tests.Serialization; /// -/// Cross-platform NamedPipe IPC roundtrip tests for AcBinarySerializer's full-lifecycle helpers -/// (Step 4 of ADR-0003, ACCORE-BIN-T-A3T8). +/// Cross-platform NamedPipe IPC roundtrip tests for AcBinarySerializer's transport-agnostic +/// streaming helpers (Step 4 of ADR-0003, ACCORE-BIN-T-A3T8). /// -/// SerializeToNamedPipeAsync and DeserializeFromNamedPipeAsync internally -/// exercise the full streaming pipeline: AcBinarySerializer.Serialize → PipeWriter → -/// NamedPipe → PipeReader → AsyncPipeReaderInput.DrainFromAsync → AcBinaryDeserializer.Deserialize. -/// With BufferWriterChunkSize = 256, even small test payloads cross multiple chunk -/// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior -/// instead of the "fits-in-one-chunk" degenerate case. +/// The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers — +/// the tests own the / +/// lifecycle directly and call the generic +/// + +/// primitives. This proves +/// the streaming framework works on arbitrary PipeWriter/PipeReader sources +/// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in +/// the framework. +/// +/// With BufferWriterChunkSize = 256, even small test payloads cross multiple chunk +/// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior. /// [TestClass] public class AcBinarySerializerNamedPipeTests @@ -21,25 +28,13 @@ public class AcBinarySerializerNamedPipeTests [TestMethod] public async Task RoundTrip_SmallChunkSize_PayloadEquals() { - // Unique pipe name per test run to avoid cross-run interference. var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; - // 256-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a - // StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 256 - // (ArgumentOutOfRangeException in StreamPipeWriter.Advance — pre-existing latent - // issue in AsyncPipeWriterOutput, not introduced here). Tracked separately; this - // test uses a known-working chunk size that still exercises framing across - // multiple chunks for our 50-item payload. + // 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing + // for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end. var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; var original = CreatePayload(50); - // Start the receiver first — DeserializeFromNamedPipeAsync's synchronous prefix - // (NamedPipeServerStream ctor) runs before the first await, so the pipe is bound - // by the time this line returns and the client can immediately connect. - var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync(pipeName, opts); - - await AcBinarySerializer.SerializeToNamedPipeAsync(pipeName, original, opts); - - var result = await receiveTask; + var result = await RunNamedPipeRoundTripAsync(pipeName, original, opts); Assert.IsNotNull(result); AssertPayloadEquals(original, result); @@ -51,10 +46,10 @@ public class AcBinarySerializerNamedPipeTests // Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points // = ~3700 deeply-nested objects with shared references (50 tags, 20 users, metadata, 10 categories). // Serialized size ~few hundred KB → many chunks at chunkSize=256 → real backpressure-driven streaming - // (PipeWriter pauseThreshold ~64KB, bytes flow incrementally as consumer drains). + // (sequential per-chunk flush on StreamPipeWriter, bytes flow incrementally as consumer drains). #if DEBUG - // Capture BOTH receiver and sender state to diagnose the StreamPipeWriter interaction. + // Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed. var diagLogs = new List(); AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}"); AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}"); @@ -65,9 +60,7 @@ public class AcBinarySerializerNamedPipeTests var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100); - var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync(pipeName, opts); - await AcBinarySerializer.SerializeToNamedPipeAsync(pipeName, original, opts); - var result = await receiveTask; + var result = await RunNamedPipeRoundTripAsync(pipeName, original, opts); Assert.IsNotNull(result); Assert.AreEqual(original.Id, result.Id); @@ -103,20 +96,60 @@ public class AcBinarySerializerNamedPipeTests } } + /// + /// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic + /// on + /// the client side and + /// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter / + /// PipeReader. + /// + private static async Task RunNamedPipeRoundTripAsync(string pipeName, T original, AcBinarySerializerOptions opts) + { + // Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with + // the OS), so the client can immediately attempt connect once we hand off to async. + await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); + + var receiveTask = Task.Run(async () => + { + await pipeServer.WaitForConnectionAsync().ConfigureAwait(false); + var pipeReader = PipeReader.Create(pipeServer); + + return await AcBinaryDeserializer.DeserializeFromPipeReaderAsync(pipeReader, opts).ConfigureAwait(false); + }); + + await using var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); + await pipeClient.ConnectAsync().ConfigureAwait(false); + + var pipeWriter = PipeWriter.Create(pipeClient); + try + { + // Public PipeWriter overload — auto-selects sequential flush strategy because + // PipeWriter.Create(stream) returns StreamPipeWriter (race-incompatible with parallel send). + AcBinarySerializer.Serialize(original, pipeWriter, opts); + } + finally + { + await pipeWriter.CompleteAsync().ConfigureAwait(false); + } + + return await receiveTask.ConfigureAwait(false); + } + private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order) { - int items = order.Items.Count; + var items = order.Items.Count; int pallets = 0, measurements = 0, points = 0; + foreach (var item in order.Items) { pallets += item.Pallets.Count; foreach (var p in item.Pallets) { measurements += p.Measurements.Count; - foreach (var m in p.Measurements) - points += m.Points.Count; + points += p.Measurements.Sum(m => m.Points.Count); } } + return (items, pallets, measurements, points); } diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs deleted file mode 100644 index 6f5c4ba..0000000 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.IO.Pipelines; -using System.IO.Pipes; -using System.Threading; -using System.Threading.Tasks; - -namespace AyCode.Core.Serializers.Binaries; - -public static partial class AcBinaryDeserializer -{ - /// - /// Deserializes from a single named-pipe client connection using AsyncSegment chunked - /// streaming. One-shot server lifecycle: creates pipe server, awaits connection, drains - /// via while a background task - /// deserializes incrementally from the same , then - /// disposes. - /// - /// Receive buffer initial capacity is derived from options.BufferWriterChunkSize × 2 - /// (the streaming-doctrine heuristic per ADR-0003 §4 — two-chunks-worth of headroom plus - /// reset-to-0 cycling reuses the same buffer for the message's lifetime regardless of total - /// payload size). - /// - /// Cross-platform: NamedPipe BCL APIs work on Windows and Linux (Unix-domain- - /// socket-backed on Linux). WASM throws per BCL - /// contract. - /// - /// For custom connection management (multiple reads, custom NamedPipe options, - /// pre-existing connection): use - /// + - /// directly on your own - /// . - /// - /// Pipe name to await connection on. - /// Serializer options. Defaults to . - /// BufferWriterChunkSize controls the receive-side initial buffer - /// (BufferWriterChunkSize × 2). - /// Cancellation token. For connect-timeout, pass the token of a - /// new CancellationTokenSource(timeout). - public static async Task DeserializeFromNamedPipeAsync(string pipeName, AcBinarySerializerOptions? options = null, CancellationToken ct = default) - { - if (pipeName is null) throw new ArgumentNullException(nameof(pipeName)); - - var opts = options ?? AcBinarySerializerOptions.Default; - - await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous); - - await pipeServer.WaitForConnectionAsync(ct).ConfigureAwait(false); - var pipeReader = PipeReader.Create(pipeServer); - - using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); - var deserTask = Task.Run(() => Deserialize(input, opts), ct); - - await input.DrainFromAsync(pipeReader, ct).ConfigureAwait(false); - return await deserTask.ConfigureAwait(false); - } -} diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index 9452a17..ecce44e 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -9,6 +9,8 @@ using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text; +using System.Threading; +using System.Threading.Tasks; using AyCode.Core.Helpers; using AyCode.Core.Serializers.Expressions; using static AyCode.Core.Helpers.JsonUtilities; @@ -288,7 +290,7 @@ public static partial class AcBinaryDeserializer return Deserialize(seg2.Array!, seg2.Offset, seg2.Count, targetType, options); VerifyAgainstLinearized(data, targetType, options); - return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); + return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); } /// @@ -298,7 +300,7 @@ public static partial class AcBinaryDeserializer /// blocking on when data is exhausted. /// public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options) - => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options); + => DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options); /// /// Deserialize from an with streaming pipeline parallelism. @@ -312,8 +314,7 @@ public static partial class AcBinaryDeserializer /// struct satisfies the JIT-specialization constraint of the generic deserialization path /// without exposing a value-type wrapper to the public API. /// - public static T? Deserialize(AsyncPipeReaderInput input) - => Deserialize(input, AcBinarySerializerOptions.Default); + public static T? Deserialize(AsyncPipeReaderInput input) => Deserialize(input, AcBinarySerializerOptions.Default); /// public static T? Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options) @@ -321,7 +322,44 @@ public static partial class AcBinaryDeserializer /// public static object? Deserialize(AsyncPipeReaderInput input, Type targetType, AcBinarySerializerOptions options) - => DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options); + => DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options); + + /// + /// Deserialize from a with full streaming pipeline + /// parallelism — drains the reader on the calling thread, while a background Task.Run + /// deserializes incrementally from the same shared . + /// + /// Transport-agnostic: works with any PipeReader source — NamedPipe IPC + /// (PipeReader.Create(namedPipeServerStream)), file-stream + /// (PipeReader.Create(fileStream)), TCP (PipeReader.Create(networkStream)), + /// or custom PipeReader implementations. Strips the [201][UINT16 size][data] + /// chunked framing internally via . + /// + /// Receive buffer initial capacity is derived from options.BufferWriterChunkSize × 2 + /// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the + /// message's lifetime regardless of total payload size. + /// + /// For the producer side: see + /// + /// or . + /// + /// Source pipe reader. Caller owns lifecycle (creation + completion). + /// Serializer options. Defaults to . + /// BufferWriterChunkSize controls the receive-side initial buffer (× 2 headroom). + /// Cancellation token. For connect-timeout, pass the token of a + /// new CancellationTokenSource(timeout). + public static async Task DeserializeFromPipeReaderAsync(System.IO.Pipelines.PipeReader reader, AcBinarySerializerOptions? options = null, CancellationToken ct = default) + { + if (reader is null) throw new ArgumentNullException(nameof(reader)); + + var opts = options ?? AcBinarySerializerOptions.Default; + + using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2); + var deserTask = Task.Run(() => Deserialize(input, opts), ct); + + await input.DrainFromAsync(reader, ct).ConfigureAwait(false); + return await deserTask.ConfigureAwait(false); + } /// /// Internal: Deserialize with any TInput (multi-segment or other future input types). diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs deleted file mode 100644 index 19e6d32..0000000 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs +++ /dev/null @@ -1,67 +0,0 @@ -using System; -using System.IO.Pipelines; -using System.IO.Pipes; -using System.Threading; -using System.Threading.Tasks; - -namespace AyCode.Core.Serializers.Binaries; - -public static partial class AcBinarySerializer -{ - /// - /// Serializes to a Windows / Linux named-pipe server using the - /// AsyncSegment chunked wire format ([201][UINT16 size][data] per chunk via - /// ). One-shot client lifecycle: connects, streams - /// chunk-by-chunk with per-chunk FlushAsync for real producer/consumer overlap, - /// completes, disposes. - /// - /// Wire format: same chunked AsyncSegment framing as SignalR uses internally — - /// unified format across all transports per ADR-0003 §9. The +5 bytes/chunk overhead - /// (~0.1% at 4 KB chunks, ~2% at 256-byte test chunks) is the cost of a single shared wire - /// format and a single framing-strip implementation (in ). - /// - /// Streaming behavior: every BufferWriterChunkSize-sized chunk is - /// flushed to the pipe immediately (per-chunk SyncAwaitFlush). Consumer can start - /// reading WHILE producer is still serializing — true pipeline parallelism even on small - /// payloads (no buffer-accumulation-then-flush behavior). - /// - /// Cross-platform: NamedPipe BCL APIs work on Windows and Linux (Unix-domain- - /// socket-backed on Linux). WASM throws per - /// BCL contract. - /// - /// For custom connection management (multiple writes, custom NamedPipe options, - /// pre-existing connection): use - /// - /// directly on a wrapping your own - /// . - /// - /// Pipe name to connect to. - /// Object to serialize. - /// Serializer options. Defaults to . - /// BufferWriterChunkSize controls the wire chunk size (max 65535). - /// NamedPipe server host. Defaults to "." (local machine). - /// Cancellation token. For connect-timeout, pass the token of a - /// new CancellationTokenSource(timeout) — uniform cancellation/timeout pattern. - public static async Task SerializeToNamedPipeAsync(string pipeName, T value, AcBinarySerializerOptions? options = null, string serverName = ".", CancellationToken ct = default) - { - if (pipeName is null) throw new ArgumentNullException(nameof(pipeName)); - if (serverName is null) throw new ArgumentNullException(nameof(serverName)); - - await using var pipeClient = new NamedPipeClientStream(serverName, pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous); - - await pipeClient.ConnectAsync(ct).ConfigureAwait(false); - - var pipeWriter = PipeWriter.Create(pipeClient); - try - { - // PipeWriter overload — chunked AsyncSegment framing via AsyncPipeWriterOutput. - // Receiver's AsyncPipeReaderInput.Feed strips framing internally; unified wire format - // across all transports per ADR-0003 §9. - Serialize(value, pipeWriter, options ?? AcBinarySerializerOptions.Default); - } - finally - { - await pipeWriter.CompleteAsync().ConfigureAwait(false); - } - } -} diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 1f159dd..1a95d48 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -423,25 +423,78 @@ public static partial class AcBinarySerializer } /// - /// Serialize to PipeWriter with chunked protocol framing via . - /// Each chunk (including the last) is framed as [201][UINT16 size][data] and committed - /// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single [202] - /// byte after to signal end-of-stream. + /// Serialize to a with chunked protocol framing via + /// — gives the caller full + /// + control because + /// is always the BCL PipeWriterImpl, which is parallel-capable (no _tailMemory + /// reset race like StreamPipeWriter). + /// + /// Each chunk (including the last) is framed as [201][UINT16 size][data] and + /// committed to pipe.Writer via Advance (zero-copy). A consumer drains + /// pipe.Reader on a background task and writes to the actual transport. + /// + /// Use this overload when you constructed new Pipe() yourself and need + /// runtime tuning of the flush strategy. For arbitrary + /// (Kestrel transport output, PipeWriter.Create(stream), custom writers), use the + /// + /// overload. /// /// The value to serialize; null writes a single null marker. - /// Target pipe (typically Kestrel's transport output). + /// Target pipe — caller drains pipe.Reader elsewhere. /// Serializer options (type wrappers, reference handling, interning, etc.). /// /// Per-chunk flush synchronization. true (default): maximum pipeline parallelism, - /// guaranteed zero-copy, but slow consumers block the server thread (bounded by ). - /// false: adaptive backpressure via memory threshold — safer for mixed consumer speeds. + /// guaranteed zero-copy + zero-alloc, but slow consumers block the producer thread (bounded by + /// ). false: adaptive backpressure via memory threshold + /// (~64KB in-flight) — safer for mixed consumer speeds, never blocks on slow consumers. /// /// - /// Per-flush timeout. null → wait forever (legacy). Positive value: throws + /// Per-flush timeout. null → wait forever. Positive value: throws /// on stuck consumers. /// /// Total serialized data bytes (excluding framing overhead). - public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + public static int Serialize(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) + { + if (pipe is null) throw new ArgumentNullException(nameof(pipe)); + return Serialize(value, pipe.Writer, options, waitForFlush, flushTimeout); + } + + /// + /// Serialize to any with chunked protocol framing + /// via . Each chunk (including the last) is framed as + /// [201][UINT16 size][data] and committed to the PipeWriter via Advance (zero-copy). + /// + /// Flush strategy is auto-selected by writer type: + /// StreamPipeWriter (from PipeWriter.Create(stream) — NamedPipe, FileStream, + /// NetworkStream, etc.) runs sequentially per chunk because the BCL impl resets + /// _tailMemory on flush completion (race-incompatible with parallel send). All other + /// PipeWriter implementations (Kestrel transport, custom impls) run with the safe + /// waitForFlush=true default — max parallelism, zero-alloc. + /// + /// Need runtime tuning of the flush strategy? If you control the pipe yourself, + /// build a instance and use the + /// + /// overload — only Pipe-based writers can guarantee parallel-capable flush behavior. + /// + /// The value to serialize; null writes a single null marker. + /// Target pipe writer. + /// Serializer options (type wrappers, reference handling, interning, etc.). + /// Total serialized data bytes (excluding framing overhead). + public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + => Serialize(value, pipeWriter, options, waitForFlush: true, flushTimeout: null); + + /// + /// Internal flush-tunable PipeWriter overload — only callable from AyCode.Services + /// (SignalR hub protocol) because external callers cannot safely choose + /// without knowing the concrete implementation. + /// SignalR uses Kestrel transport output, which is parallel-capable, and forwards the + /// hub-protocol-options-configured tuning here. + /// + /// For the public API, see the overload (parallel-capable, + /// tuning paramters available) or the simple overload + /// (auto-selects strategy, no tuning). + /// + internal static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout) { if (value == null) {