[LOADED_DOCS: 2 files, no new loads]
Remove NamedPipe helpers; make binary serializer transport-agnostic Removed NamedPipe-specific serializer/deserializer helpers from the framework. Refactored tests to manage NamedPipe lifecycle directly using generic PipeWriter/PipeReader APIs. Introduced a transport-agnostic async deserialization method for any PipeReader. Updated XML docs and method signatures to clarify usage and flush strategies. All NamedPipe logic is now test-only; the core binary serializer is fully transport-agnostic. Minor test cleanups included.
This commit is contained in:
parent
4a8c961d87
commit
4ca3f51632
|
|
@ -1,3 +1,5 @@
|
|||
using System.IO.Pipelines;
|
||||
using System.IO.Pipes;
|
||||
using AyCode.Core.Serializers.Binaries;
|
||||
using AyCode.Core.Tests.TestModels;
|
||||
using static AyCode.Core.Tests.TestModels.AcSerializerModels;
|
||||
|
|
@ -5,15 +7,20 @@ using static AyCode.Core.Tests.TestModels.AcSerializerModels;
|
|||
namespace AyCode.Core.Tests.Serialization;
|
||||
|
||||
/// <summary>
|
||||
/// Cross-platform NamedPipe IPC roundtrip tests for AcBinarySerializer's full-lifecycle helpers
|
||||
/// (Step 4 of ADR-0003, ACCORE-BIN-T-A3T8).
|
||||
/// Cross-platform NamedPipe IPC roundtrip tests for AcBinarySerializer's transport-agnostic
|
||||
/// streaming helpers (Step 4 of ADR-0003, ACCORE-BIN-T-A3T8).
|
||||
///
|
||||
/// <para><c>SerializeToNamedPipeAsync</c> and <c>DeserializeFromNamedPipeAsync</c> internally
|
||||
/// exercise the full streaming pipeline: <c>AcBinarySerializer.Serialize → PipeWriter →
|
||||
/// NamedPipe → PipeReader → AsyncPipeReaderInput.DrainFromAsync → AcBinaryDeserializer.Deserialize</c>.
|
||||
/// With <c>BufferWriterChunkSize = 256</c>, even small test payloads cross multiple chunk
|
||||
/// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior
|
||||
/// instead of the "fits-in-one-chunk" degenerate case.</para>
|
||||
/// <para>The serializer/deserializer surface intentionally has NO NamedPipe-specific helpers —
|
||||
/// the tests own the <see cref="NamedPipeServerStream"/> / <see cref="NamedPipeClientStream"/>
|
||||
/// lifecycle directly and call the generic
|
||||
/// <see cref="AcBinarySerializer.Serialize{T}(T, PipeWriter, AcBinarySerializerOptions)"/> +
|
||||
/// <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/> primitives. This proves
|
||||
/// the streaming framework works on arbitrary <c>PipeWriter</c>/<c>PipeReader</c> sources
|
||||
/// (NamedPipe, FileStream, NetworkStream, custom transports) without per-transport adapters in
|
||||
/// the framework.</para>
|
||||
///
|
||||
/// <para>With <c>BufferWriterChunkSize = 256</c>, even small test payloads cross multiple chunk
|
||||
/// boundaries on the wire — exercises the real chunking + sliding-window cycling behavior.</para>
|
||||
/// </summary>
|
||||
[TestClass]
|
||||
public class AcBinarySerializerNamedPipeTests
|
||||
|
|
@ -21,25 +28,13 @@ public class AcBinarySerializerNamedPipeTests
|
|||
[TestMethod]
|
||||
public async Task RoundTrip_SmallChunkSize_PayloadEquals()
|
||||
{
|
||||
// Unique pipe name per test run to avoid cross-run interference.
|
||||
var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}";
|
||||
// 256-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a
|
||||
// StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 256
|
||||
// (ArgumentOutOfRangeException in StreamPipeWriter.Advance — pre-existing latent
|
||||
// issue in AsyncPipeWriterOutput, not introduced here). Tracked separately; this
|
||||
// test uses a known-working chunk size that still exercises framing across
|
||||
// multiple chunks for our 50-item payload.
|
||||
// 256-byte chunk size = Kestrel slab default; small enough to force multi-chunk framing
|
||||
// for our 50-item payload, exercises the AsyncSegment chunked wire format end-to-end.
|
||||
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
|
||||
var original = CreatePayload(50);
|
||||
|
||||
// Start the receiver first — DeserializeFromNamedPipeAsync's synchronous prefix
|
||||
// (NamedPipeServerStream ctor) runs before the first await, so the pipe is bound
|
||||
// by the time this line returns and the client can immediately connect.
|
||||
var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync<TestParentWithDateTimeItemCollection>(pipeName, opts);
|
||||
|
||||
await AcBinarySerializer.SerializeToNamedPipeAsync(pipeName, original, opts);
|
||||
|
||||
var result = await receiveTask;
|
||||
var result = await RunNamedPipeRoundTripAsync(pipeName, original, opts);
|
||||
|
||||
Assert.IsNotNull(result);
|
||||
AssertPayloadEquals(original, result);
|
||||
|
|
@ -51,10 +46,10 @@ public class AcBinarySerializerNamedPipeTests
|
|||
// Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points
|
||||
// = ~3700 deeply-nested objects with shared references (50 tags, 20 users, metadata, 10 categories).
|
||||
// Serialized size ~few hundred KB → many chunks at chunkSize=256 → real backpressure-driven streaming
|
||||
// (PipeWriter pauseThreshold ~64KB, bytes flow incrementally as consumer drains).
|
||||
// (sequential per-chunk flush on StreamPipeWriter, bytes flow incrementally as consumer drains).
|
||||
|
||||
#if DEBUG
|
||||
// Capture BOTH receiver and sender state to diagnose the StreamPipeWriter interaction.
|
||||
// Capture BOTH receiver and sender state to diagnose StreamPipeWriter interaction if needed.
|
||||
var diagLogs = new List<string>();
|
||||
AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}");
|
||||
AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}");
|
||||
|
|
@ -65,9 +60,7 @@ public class AcBinarySerializerNamedPipeTests
|
|||
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
|
||||
var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100);
|
||||
|
||||
var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync<TestOrder>(pipeName, opts);
|
||||
await AcBinarySerializer.SerializeToNamedPipeAsync(pipeName, original, opts);
|
||||
var result = await receiveTask;
|
||||
var result = await RunNamedPipeRoundTripAsync(pipeName, original, opts);
|
||||
|
||||
Assert.IsNotNull(result);
|
||||
Assert.AreEqual(original.Id, result.Id);
|
||||
|
|
@ -103,20 +96,60 @@ public class AcBinarySerializerNamedPipeTests
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Owns the full NamedPipe lifecycle: binds server, accepts connect, drives the generic
|
||||
/// <see cref="AcBinarySerializer.Serialize{T}(T, PipeWriter, AcBinarySerializerOptions)"/> on
|
||||
/// the client side and <see cref="AcBinaryDeserializer.DeserializeFromPipeReaderAsync{T}"/>
|
||||
/// on the server side. The framework helpers know nothing about NamedPipe — only PipeWriter /
|
||||
/// PipeReader.
|
||||
/// </summary>
|
||||
private static async Task<T?> RunNamedPipeRoundTripAsync<T>(string pipeName, T original, AcBinarySerializerOptions opts)
|
||||
{
|
||||
// Server-side bind is synchronous (NamedPipeServerStream ctor registers the pipe with
|
||||
// the OS), so the client can immediately attempt connect once we hand off to async.
|
||||
await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
|
||||
|
||||
var receiveTask = Task.Run(async () =>
|
||||
{
|
||||
await pipeServer.WaitForConnectionAsync().ConfigureAwait(false);
|
||||
var pipeReader = PipeReader.Create(pipeServer);
|
||||
|
||||
return await AcBinaryDeserializer.DeserializeFromPipeReaderAsync<T>(pipeReader, opts).ConfigureAwait(false);
|
||||
});
|
||||
|
||||
await using var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
|
||||
await pipeClient.ConnectAsync().ConfigureAwait(false);
|
||||
|
||||
var pipeWriter = PipeWriter.Create(pipeClient);
|
||||
try
|
||||
{
|
||||
// Public PipeWriter overload — auto-selects sequential flush strategy because
|
||||
// PipeWriter.Create(stream) returns StreamPipeWriter (race-incompatible with parallel send).
|
||||
AcBinarySerializer.Serialize(original, pipeWriter, opts);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await pipeWriter.CompleteAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return await receiveTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order)
|
||||
{
|
||||
int items = order.Items.Count;
|
||||
var items = order.Items.Count;
|
||||
int pallets = 0, measurements = 0, points = 0;
|
||||
|
||||
foreach (var item in order.Items)
|
||||
{
|
||||
pallets += item.Pallets.Count;
|
||||
foreach (var p in item.Pallets)
|
||||
{
|
||||
measurements += p.Measurements.Count;
|
||||
foreach (var m in p.Measurements)
|
||||
points += m.Points.Count;
|
||||
points += p.Measurements.Sum(m => m.Points.Count);
|
||||
}
|
||||
}
|
||||
|
||||
return (items, pallets, measurements, points);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,56 +0,0 @@
|
|||
using System;
|
||||
using System.IO.Pipelines;
|
||||
using System.IO.Pipes;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace AyCode.Core.Serializers.Binaries;
|
||||
|
||||
public static partial class AcBinaryDeserializer
|
||||
{
|
||||
/// <summary>
|
||||
/// Deserializes from a single named-pipe client connection using AsyncSegment chunked
|
||||
/// streaming. One-shot server lifecycle: creates pipe server, awaits connection, drains
|
||||
/// via <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> while a background task
|
||||
/// deserializes incrementally from the same <see cref="AsyncPipeReaderInput"/>, then
|
||||
/// disposes.
|
||||
///
|
||||
/// <para>Receive buffer initial capacity is derived from <c>options.BufferWriterChunkSize × 2</c>
|
||||
/// (the streaming-doctrine heuristic per ADR-0003 §4 — two-chunks-worth of headroom plus
|
||||
/// reset-to-0 cycling reuses the same buffer for the message's lifetime regardless of total
|
||||
/// payload size).</para>
|
||||
///
|
||||
/// <para><b>Cross-platform</b>: NamedPipe BCL APIs work on Windows and Linux (Unix-domain-
|
||||
/// socket-backed on Linux). WASM throws <see cref="PlatformNotSupportedException"/> per BCL
|
||||
/// contract.</para>
|
||||
///
|
||||
/// <para><b>For custom connection management</b> (multiple reads, custom NamedPipe options,
|
||||
/// pre-existing connection): use
|
||||
/// <see cref="Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/> +
|
||||
/// <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> directly on your own
|
||||
/// <see cref="NamedPipeServerStream"/>.</para>
|
||||
/// </summary>
|
||||
/// <param name="pipeName">Pipe name to await connection on.</param>
|
||||
/// <param name="options">Serializer options. Defaults to <see cref="AcBinarySerializerOptions.Default"/>.
|
||||
/// <c>BufferWriterChunkSize</c> controls the receive-side initial buffer
|
||||
/// (<c>BufferWriterChunkSize × 2</c>).</param>
|
||||
/// <param name="ct">Cancellation token. For connect-timeout, pass the token of a
|
||||
/// <c>new CancellationTokenSource(timeout)</c>.</param>
|
||||
public static async Task<T?> DeserializeFromNamedPipeAsync<T>(string pipeName, AcBinarySerializerOptions? options = null, CancellationToken ct = default)
|
||||
{
|
||||
if (pipeName is null) throw new ArgumentNullException(nameof(pipeName));
|
||||
|
||||
var opts = options ?? AcBinarySerializerOptions.Default;
|
||||
|
||||
await using var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
|
||||
|
||||
await pipeServer.WaitForConnectionAsync(ct).ConfigureAwait(false);
|
||||
var pipeReader = PipeReader.Create(pipeServer);
|
||||
|
||||
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
|
||||
var deserTask = Task.Run(() => Deserialize<T>(input, opts), ct);
|
||||
|
||||
await input.DrainFromAsync(pipeReader, ct).ConfigureAwait(false);
|
||||
return await deserTask.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
|
@ -9,6 +9,8 @@ using System.Reflection;
|
|||
using System.Runtime.CompilerServices;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using AyCode.Core.Helpers;
|
||||
using AyCode.Core.Serializers.Expressions;
|
||||
using static AyCode.Core.Helpers.JsonUtilities;
|
||||
|
|
@ -288,7 +290,7 @@ public static partial class AcBinaryDeserializer
|
|||
return Deserialize(seg2.Array!, seg2.Offset, seg2.Count, targetType, options);
|
||||
|
||||
VerifyAgainstLinearized(data, targetType, options);
|
||||
return DeserializeSequence<SequenceBinaryInput>(new SequenceBinaryInput(data), targetType, options);
|
||||
return DeserializeSequence(new SequenceBinaryInput(data), targetType, options);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -298,7 +300,7 @@ public static partial class AcBinaryDeserializer
|
|||
/// blocking on <see cref="System.Threading.ManualResetEventSlim"/> when data is exhausted.
|
||||
/// </summary>
|
||||
public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options)
|
||||
=> DeserializeSequence<SegmentBufferReaderInput>(new SegmentBufferReaderInput(reader), targetType, options);
|
||||
=> DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options);
|
||||
|
||||
/// <summary>
|
||||
/// Deserialize from an <see cref="AsyncPipeReaderInput"/> with streaming pipeline parallelism.
|
||||
|
|
@ -312,8 +314,7 @@ public static partial class AcBinaryDeserializer
|
|||
/// struct satisfies the JIT-specialization constraint of the generic deserialization path
|
||||
/// without exposing a value-type wrapper to the public API.</para>
|
||||
/// </summary>
|
||||
public static T? Deserialize<T>(AsyncPipeReaderInput input)
|
||||
=> Deserialize<T>(input, AcBinarySerializerOptions.Default);
|
||||
public static T? Deserialize<T>(AsyncPipeReaderInput input) => Deserialize<T>(input, AcBinarySerializerOptions.Default);
|
||||
|
||||
/// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/>
|
||||
public static T? Deserialize<T>(AsyncPipeReaderInput input, AcBinarySerializerOptions options)
|
||||
|
|
@ -321,7 +322,44 @@ public static partial class AcBinaryDeserializer
|
|||
|
||||
/// <inheritdoc cref="Deserialize{T}(AsyncPipeReaderInput)"/>
|
||||
public static object? Deserialize(AsyncPipeReaderInput input, Type targetType, AcBinarySerializerOptions options)
|
||||
=> DeserializeSequence<AsyncPipeReaderInputAdapter>(new AsyncPipeReaderInputAdapter(input), targetType, options);
|
||||
=> DeserializeSequence(new AsyncPipeReaderInputAdapter(input), targetType, options);
|
||||
|
||||
/// <summary>
|
||||
/// Deserialize from a <see cref="System.IO.Pipelines.PipeReader"/> with full streaming pipeline
|
||||
/// parallelism — drains the reader on the calling thread, while a background <c>Task.Run</c>
|
||||
/// deserializes incrementally from the same shared <see cref="AsyncPipeReaderInput"/>.
|
||||
///
|
||||
/// <para>Transport-agnostic: works with any <c>PipeReader</c> source — NamedPipe IPC
|
||||
/// (<c>PipeReader.Create(namedPipeServerStream)</c>), file-stream
|
||||
/// (<c>PipeReader.Create(fileStream)</c>), TCP (<c>PipeReader.Create(networkStream)</c>),
|
||||
/// or custom <c>PipeReader</c> implementations. Strips the <c>[201][UINT16 size][data]</c>
|
||||
/// chunked framing internally via <see cref="AsyncPipeReaderInput.Feed"/>.</para>
|
||||
///
|
||||
/// <para>Receive buffer initial capacity is derived from <c>options.BufferWriterChunkSize × 2</c>
|
||||
/// — two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the
|
||||
/// message's lifetime regardless of total payload size.</para>
|
||||
///
|
||||
/// <para><b>For the producer side</b>: see
|
||||
/// <see cref="AcBinarySerializer.Serialize{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>
|
||||
/// or <see cref="AcBinarySerializer.Serialize{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>.</para>
|
||||
/// </summary>
|
||||
/// <param name="reader">Source pipe reader. Caller owns lifecycle (creation + completion).</param>
|
||||
/// <param name="options">Serializer options. Defaults to <see cref="AcBinarySerializerOptions.Default"/>.
|
||||
/// <c>BufferWriterChunkSize</c> controls the receive-side initial buffer (× 2 headroom).</param>
|
||||
/// <param name="ct">Cancellation token. For connect-timeout, pass the token of a
|
||||
/// <c>new CancellationTokenSource(timeout)</c>.</param>
|
||||
public static async Task<T?> DeserializeFromPipeReaderAsync<T>(System.IO.Pipelines.PipeReader reader, AcBinarySerializerOptions? options = null, CancellationToken ct = default)
|
||||
{
|
||||
if (reader is null) throw new ArgumentNullException(nameof(reader));
|
||||
|
||||
var opts = options ?? AcBinarySerializerOptions.Default;
|
||||
|
||||
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
|
||||
var deserTask = Task.Run(() => Deserialize<T>(input, opts), ct);
|
||||
|
||||
await input.DrainFromAsync(reader, ct).ConfigureAwait(false);
|
||||
return await deserTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Internal: Deserialize with any TInput (multi-segment or other future input types).
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
using System;
|
||||
using System.IO.Pipelines;
|
||||
using System.IO.Pipes;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace AyCode.Core.Serializers.Binaries;
|
||||
|
||||
public static partial class AcBinarySerializer
|
||||
{
|
||||
/// <summary>
|
||||
/// Serializes <paramref name="value"/> to a Windows / Linux named-pipe server using the
|
||||
/// AsyncSegment chunked wire format (<c>[201][UINT16 size][data]</c> per chunk via
|
||||
/// <see cref="AsyncPipeWriterOutput"/>). One-shot client lifecycle: connects, streams
|
||||
/// chunk-by-chunk with per-chunk <c>FlushAsync</c> for real producer/consumer overlap,
|
||||
/// completes, disposes.
|
||||
///
|
||||
/// <para><b>Wire format</b>: same chunked AsyncSegment framing as SignalR uses internally —
|
||||
/// unified format across all transports per ADR-0003 §9. The <c>+5 bytes/chunk</c> overhead
|
||||
/// (~0.1% at 4 KB chunks, ~2% at 256-byte test chunks) is the cost of a single shared wire
|
||||
/// format and a single framing-strip implementation (in <see cref="AsyncPipeReaderInput.Feed"/>).</para>
|
||||
///
|
||||
/// <para><b>Streaming behavior</b>: every <c>BufferWriterChunkSize</c>-sized chunk is
|
||||
/// flushed to the pipe immediately (per-chunk <c>SyncAwaitFlush</c>). Consumer can start
|
||||
/// reading WHILE producer is still serializing — true pipeline parallelism even on small
|
||||
/// payloads (no buffer-accumulation-then-flush behavior).</para>
|
||||
///
|
||||
/// <para><b>Cross-platform</b>: NamedPipe BCL APIs work on Windows and Linux (Unix-domain-
|
||||
/// socket-backed on Linux). WASM throws <see cref="PlatformNotSupportedException"/> per
|
||||
/// BCL contract.</para>
|
||||
///
|
||||
/// <para><b>For custom connection management</b> (multiple writes, custom NamedPipe options,
|
||||
/// pre-existing connection): use
|
||||
/// <see cref="Serialize{T}(T, PipeWriter, AcBinarySerializerOptions, bool, TimeSpan?)"/>
|
||||
/// directly on a <see cref="PipeWriter"/> wrapping your own
|
||||
/// <see cref="NamedPipeClientStream"/>.</para>
|
||||
/// </summary>
|
||||
/// <param name="pipeName">Pipe name to connect to.</param>
|
||||
/// <param name="value">Object to serialize.</param>
|
||||
/// <param name="options">Serializer options. Defaults to <see cref="AcBinarySerializerOptions.Default"/>.
|
||||
/// <c>BufferWriterChunkSize</c> controls the wire chunk size (max 65535).</param>
|
||||
/// <param name="serverName">NamedPipe server host. Defaults to <c>"."</c> (local machine).</param>
|
||||
/// <param name="ct">Cancellation token. For connect-timeout, pass the token of a
|
||||
/// <c>new CancellationTokenSource(timeout)</c> — uniform cancellation/timeout pattern.</param>
|
||||
public static async Task SerializeToNamedPipeAsync<T>(string pipeName, T value, AcBinarySerializerOptions? options = null, string serverName = ".", CancellationToken ct = default)
|
||||
{
|
||||
if (pipeName is null) throw new ArgumentNullException(nameof(pipeName));
|
||||
if (serverName is null) throw new ArgumentNullException(nameof(serverName));
|
||||
|
||||
await using var pipeClient = new NamedPipeClientStream(serverName, pipeName, PipeDirection.Out, System.IO.Pipes.PipeOptions.Asynchronous);
|
||||
|
||||
await pipeClient.ConnectAsync(ct).ConfigureAwait(false);
|
||||
|
||||
var pipeWriter = PipeWriter.Create(pipeClient);
|
||||
try
|
||||
{
|
||||
// PipeWriter overload — chunked AsyncSegment framing via AsyncPipeWriterOutput.
|
||||
// Receiver's AsyncPipeReaderInput.Feed strips framing internally; unified wire format
|
||||
// across all transports per ADR-0003 §9.
|
||||
Serialize(value, pipeWriter, options ?? AcBinarySerializerOptions.Default);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await pipeWriter.CompleteAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -423,25 +423,78 @@ public static partial class AcBinarySerializer
|
|||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serialize to PipeWriter with chunked protocol framing via <see cref="AsyncPipeWriterOutput"/>.
|
||||
/// Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> and committed
|
||||
/// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single <c>[202]</c>
|
||||
/// byte after to signal end-of-stream.
|
||||
/// Serialize to a <see cref="System.IO.Pipelines.Pipe"/> with chunked protocol framing via
|
||||
/// <see cref="AsyncPipeWriterOutput"/> — gives the caller full <paramref name="waitForFlush"/>
|
||||
/// + <paramref name="flushTimeout"/> control because <see cref="System.IO.Pipelines.Pipe.Writer"/>
|
||||
/// is always the BCL <c>PipeWriterImpl</c>, which is parallel-capable (no <c>_tailMemory</c>
|
||||
/// reset race like <c>StreamPipeWriter</c>).
|
||||
///
|
||||
/// <para>Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> and
|
||||
/// committed to <c>pipe.Writer</c> via <c>Advance</c> (zero-copy). A consumer drains
|
||||
/// <c>pipe.Reader</c> on a background task and writes to the actual transport.</para>
|
||||
///
|
||||
/// <para><b>Use this overload when</b> you constructed <c>new Pipe()</c> yourself and need
|
||||
/// runtime tuning of the flush strategy. For arbitrary <see cref="System.IO.Pipelines.PipeWriter"/>
|
||||
/// (Kestrel transport output, <c>PipeWriter.Create(stream)</c>, custom writers), use the
|
||||
/// <see cref="Serialize{T}(T, System.IO.Pipelines.PipeWriter, AcBinarySerializerOptions)"/>
|
||||
/// overload.</para>
|
||||
/// </summary>
|
||||
/// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param>
|
||||
/// <param name="pipeWriter">Target pipe (typically Kestrel's transport output).</param>
|
||||
/// <param name="pipe">Target pipe — caller drains <c>pipe.Reader</c> elsewhere.</param>
|
||||
/// <param name="options">Serializer options (type wrappers, reference handling, interning, etc.).</param>
|
||||
/// <param name="waitForFlush">
|
||||
/// Per-chunk flush synchronization. <c>true</c> (default): maximum pipeline parallelism,
|
||||
/// guaranteed zero-copy, but slow consumers block the server thread (bounded by <paramref name="flushTimeout"/>).
|
||||
/// <c>false</c>: adaptive backpressure via memory threshold — safer for mixed consumer speeds.
|
||||
/// guaranteed zero-copy + zero-alloc, but slow consumers block the producer thread (bounded by
|
||||
/// <paramref name="flushTimeout"/>). <c>false</c>: adaptive backpressure via memory threshold
|
||||
/// (~64KB in-flight) — safer for mixed consumer speeds, never blocks on slow consumers.
|
||||
/// </param>
|
||||
/// <param name="flushTimeout">
|
||||
/// Per-flush timeout. <c>null</c> → wait forever (legacy). Positive value: throws
|
||||
/// Per-flush timeout. <c>null</c> → wait forever. Positive value: throws
|
||||
/// <see cref="TimeoutException"/> on stuck consumers.
|
||||
/// </param>
|
||||
/// <returns>Total serialized data bytes (excluding framing overhead).</returns>
|
||||
public static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
|
||||
public static int Serialize<T>(T value, System.IO.Pipelines.Pipe pipe, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
|
||||
{
|
||||
if (pipe is null) throw new ArgumentNullException(nameof(pipe));
|
||||
return Serialize(value, pipe.Writer, options, waitForFlush, flushTimeout);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serialize to any <see cref="System.IO.Pipelines.PipeWriter"/> with chunked protocol framing
|
||||
/// via <see cref="AsyncPipeWriterOutput"/>. Each chunk (including the last) is framed as
|
||||
/// <c>[201][UINT16 size][data]</c> and committed to the PipeWriter via Advance (zero-copy).
|
||||
///
|
||||
/// <para><b>Flush strategy is auto-selected by writer type:</b>
|
||||
/// <c>StreamPipeWriter</c> (from <c>PipeWriter.Create(stream)</c> — NamedPipe, FileStream,
|
||||
/// NetworkStream, etc.) runs sequentially per chunk because the BCL impl resets
|
||||
/// <c>_tailMemory</c> on flush completion (race-incompatible with parallel send). All other
|
||||
/// PipeWriter implementations (Kestrel transport, custom impls) run with the safe
|
||||
/// <c>waitForFlush=true</c> default — max parallelism, zero-alloc.</para>
|
||||
///
|
||||
/// <para><b>Need runtime tuning of the flush strategy?</b> If you control the pipe yourself,
|
||||
/// build a <see cref="System.IO.Pipelines.Pipe"/> instance and use the
|
||||
/// <see cref="Serialize{T}(T, System.IO.Pipelines.Pipe, AcBinarySerializerOptions, bool, TimeSpan?)"/>
|
||||
/// overload — only Pipe-based writers can guarantee parallel-capable flush behavior.</para>
|
||||
/// </summary>
|
||||
/// <param name="value">The value to serialize; <c>null</c> writes a single null marker.</param>
|
||||
/// <param name="pipeWriter">Target pipe writer.</param>
|
||||
/// <param name="options">Serializer options (type wrappers, reference handling, interning, etc.).</param>
|
||||
/// <returns>Total serialized data bytes (excluding framing overhead).</returns>
|
||||
public static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options)
|
||||
=> Serialize(value, pipeWriter, options, waitForFlush: true, flushTimeout: null);
|
||||
|
||||
/// <summary>
|
||||
/// Internal flush-tunable PipeWriter overload — only callable from <c>AyCode.Services</c>
|
||||
/// (SignalR hub protocol) because external callers cannot safely choose <paramref name="waitForFlush"/>
|
||||
/// without knowing the concrete <see cref="System.IO.Pipelines.PipeWriter"/> implementation.
|
||||
/// SignalR uses Kestrel transport output, which is parallel-capable, and forwards the
|
||||
/// hub-protocol-options-configured tuning here.
|
||||
///
|
||||
/// <para>For the public API, see the <see cref="System.IO.Pipelines.Pipe"/> overload (parallel-capable,
|
||||
/// tuning paramters available) or the simple <see cref="System.IO.Pipelines.PipeWriter"/> overload
|
||||
/// (auto-selects strategy, no tuning).</para>
|
||||
/// </summary>
|
||||
internal static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush, TimeSpan? flushTimeout)
|
||||
{
|
||||
if (value == null)
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue