[LOADED_DOCS: 3 files, no new loads]

Refactor AsyncPipeWriterOutput for stream compatibility

- Reduce test chunk size to 256 bytes and update test names/comments
- Add sender-side diagnostic logging and unify with receiver logs
- Detect StreamPipeWriter at runtime and enforce sequential flush/acquire for streams
- Retain parallelism for pipe-based writers (Kestrel/SignalR)
- Add DEBUG-only diagnostics at key chunking points
- Minor code style cleanups and doc clarifications
- Add Bash command to fetch StreamPipeWriter.cs for reference
This commit is contained in:
Loretta 2026-04-29 00:33:35 +02:00
parent ab1af9fcfa
commit 4a8c961d87
7 changed files with 88 additions and 51 deletions

File diff suppressed because one or more lines are too long

View File

@ -23,13 +23,13 @@ public class AcBinarySerializerNamedPipeTests
{
// Unique pipe name per test run to avoid cross-run interference.
var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}";
// 4096-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a
// StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 4096
// 256-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a
// StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 256
// (ArgumentOutOfRangeException in StreamPipeWriter.Advance — pre-existing latent
// issue in AsyncPipeWriterOutput, not introduced here). Tracked separately; this
// test uses a known-working chunk size that still exercises framing across
// multiple chunks for our 50-item payload.
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 };
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
var original = CreatePayload(50);
// Start the receiver first — DeserializeFromNamedPipeAsync's synchronous prefix
@ -46,24 +46,23 @@ public class AcBinarySerializerNamedPipeTests
}
[TestMethod]
public async Task RoundTrip_LargeScalePayload_ChunkSize4096_StructuralEquality()
public async Task RoundTrip_LargeScalePayload_ChunkSize256_StructuralEquality()
{
// Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points
// = ~3700 deeply-nested objects with shared references (50 tags, 20 users, metadata, 10 categories).
// Serialized size ~few hundred KB → many chunks at chunkSize=4096 → real backpressure-driven streaming
// Serialized size ~few hundred KB → many chunks at chunkSize=256 → real backpressure-driven streaming
// (PipeWriter pauseThreshold ~64KB, bytes flow incrementally as consumer drains).
#if DEBUG
// Capture receiver-side state-machine trail to diagnose where the failure occurs
// relative to receiver activity. DiagnosticLog is static, so we save/restore around
// the test body to keep tests independent.
// Capture BOTH receiver and sender state to diagnose the StreamPipeWriter interaction.
var diagLogs = new List<string>();
AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add(msg);
AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}");
AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}");
#endif
try
{
var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}";
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 };
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100);
var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync<TestOrder>(pipeName, opts);
@ -88,11 +87,12 @@ public class AcBinarySerializerNamedPipeTests
{
#if DEBUG
AsyncPipeReaderInput.DiagnosticLog = null;
AsyncPipeWriterOutput.DiagnosticLog = null;
if (diagLogs.Count > 0)
{
Console.WriteLine($"=== AsyncPipeReaderInput DiagnosticLog trail ({diagLogs.Count} entries) ===");
// Print last 50 entries (most relevant to failure point)
var startIdx = Math.Max(0, diagLogs.Count - 50);
Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ===");
// Print last 60 entries (most relevant to failure point)
var startIdx = Math.Max(0, diagLogs.Count - 60);
if (startIdx > 0)
Console.WriteLine($" ... ({startIdx} earlier entries elided)");
for (var i = startIdx; i < diagLogs.Count; i++)

View File

@ -36,10 +36,7 @@ public static partial class AcBinaryDeserializer
/// (<c>BufferWriterChunkSize × 2</c>).</param>
/// <param name="ct">Cancellation token. For connect-timeout, pass the token of a
/// <c>new CancellationTokenSource(timeout)</c>.</param>
public static async Task<T?> DeserializeFromNamedPipeAsync<T>(
string pipeName,
AcBinarySerializerOptions? options = null,
CancellationToken ct = default)
public static async Task<T?> DeserializeFromNamedPipeAsync<T>(string pipeName, AcBinarySerializerOptions? options = null, CancellationToken ct = default)
{
if (pipeName is null) throw new ArgumentNullException(nameof(pipeName));

View File

@ -42,12 +42,7 @@ public static partial class AcBinarySerializer
/// <param name="serverName">NamedPipe server host. Defaults to <c>"."</c> (local machine).</param>
/// <param name="ct">Cancellation token. For connect-timeout, pass the token of a
/// <c>new CancellationTokenSource(timeout)</c> — uniform cancellation/timeout pattern.</param>
public static async Task SerializeToNamedPipeAsync<T>(
string pipeName,
T value,
AcBinarySerializerOptions? options = null,
string serverName = ".",
CancellationToken ct = default)
public static async Task SerializeToNamedPipeAsync<T>(string pipeName, T value, AcBinarySerializerOptions? options = null, string serverName = ".", CancellationToken ct = default)
{
if (pipeName is null) throw new ArgumentNullException(nameof(pipeName));
if (serverName is null) throw new ArgumentNullException(nameof(serverName));

View File

@ -441,10 +441,7 @@ public static partial class AcBinarySerializer
/// <see cref="TimeoutException"/> on stuck consumers.
/// </param>
/// <returns>Total serialized data bytes (excluding framing overhead).</returns>
public static int Serialize<T>(
T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options,
bool waitForFlush = true,
TimeSpan? flushTimeout = null)
public static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (value == null)
{

View File

@ -34,10 +34,7 @@ public static class AsyncPipeReaderInputExtensions
/// <param name="reader">The pipe reader to drain.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <exception cref="ArgumentNullException">If <paramref name="input"/> or <paramref name="reader"/> is <c>null</c>.</exception>
public static async Task DrainFromAsync(
this AsyncPipeReaderInput input,
PipeReader reader,
CancellationToken cancellationToken = default)
public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken cancellationToken = default)
{
if (input is null) throw new ArgumentNullException(nameof(input));
if (reader is null) throw new ArgumentNullException(nameof(reader));
@ -48,11 +45,9 @@ public static class AsyncPipeReaderInputExtensions
{
var result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
foreach (var segment in result.Buffer)
input.Feed(segment.Span);
foreach (var segment in result.Buffer) input.Feed(segment.Span);
reader.AdvanceTo(result.Buffer.End);
if (result.IsCompleted) break;
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
@ -48,15 +50,36 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// <summary>Maximum chunk data size (UINT16 max).</summary>
public const int MaxChunkSize = ushort.MaxValue;
/// <summary>
/// Cached <see cref="StreamPipeWriter"/> runtime type, discovered via the public
/// <see cref="PipeWriter.Create(Stream, StreamPipeWriterOptions)"/> factory at class-load
/// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the
/// internal type, this auto-tracks). The dummy instance is unreachable after class init
/// and GC-collected; the static field retains only the <see cref="Type"/> reference.
/// </summary>
private static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType();
private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize;
private readonly bool _waitForFlush;
private readonly bool _serializeFlushAndAcquire;
private readonly TimeSpan _flushTimeout;
private int _committedBytes;
private int _currentChunkStart;
private bool _ownedBuffer;
private ValueTask<FlushResult> _lastFlush;
/// <summary>
/// Static diagnostic sink for sender-side state inspection. <c>null</c> by default — set
/// from tests to capture <c>AcquireChunk</c> / <c>CommitCurrentChunk</c> events with full
/// segment + bookkeeping values. <see cref="EmitDiagnostic"/> is <see cref="ConditionalAttribute"/>-
/// decorated, so call sites are removed in RELEASE (zero runtime cost).
/// </summary>
public static Action<string>? DiagnosticLog;
[Conditional("DEBUG")]
private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message);
/// <summary>Creates an output bound to the given PipeWriter with self-describing chunked framing.</summary>
/// <param name="pipeWriter">Target pipe (typically Kestrel's transport output for SignalR).</param>
/// <param name="chunkSize">Per-chunk data size (max <see cref="MaxChunkSize"/>). Default 4 KB matches Kestrel's slab size.</param>
@ -75,6 +98,13 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
// null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms).
// A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller.
_flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan;
// StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default
// at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this
// class deliberately uses. For Stream-backed writers, fully await the just-started flush
// before acquiring the next chunk's memory (the writer-correct usage pattern; flush is
// a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT
// reset state on flush completion → the parallelism feature stays intact for them.
_serializeFlushAndAcquire = pipeWriter.GetType() == StreamPipeWriterType;
_committedBytes = 0;
_ownedBuffer = false;
_lastFlush = default;
@ -122,23 +152,38 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
[MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{
// Backpressure: wait for previous flush if still in progress,
// or if committed bytes approach the Pipe's PauseWriterThreshold (~64KB)
// to prevent unbounded memory growth in waitForFlush=false mode.
if (_serializeFlushAndAcquire)
{
// STREAMPIPEWRITER path — sequential per chunk: commit → flush → await → acquire.
// Stream-backed writers (NamedPipe / FileStream / NetworkStream) reset internal
// state (_tailMemory) at flush completion → cannot acquire-during-flush concurrently
// (the standard Stream-PipeWriter usage pattern is await-flush-before-next-write).
// waitForFlush / _committedBytes throttling don't apply here — the writer pattern
// enforces sequentiality intrinsically.
CommitCurrentChunk(buffer, position);
SyncAwaitFlush(_pipeWriter.FlushAsync());
}
else
{
// PIPE-BASED path (Kestrel / SignalR) — parallel sender: serializer writes the next
// chunk into the PipeWriter's buffer concurrently with the background FlushAsync.
// waitForFlush=true: backpressure — wait for the previous parallel flush before
// starting a new one (prevents unbounded in-flight flushes).
// waitForFlush=false: adaptive — skip the wait, but force-await if _committedBytes
// approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer
// growth when the consumer is slow.
// The conditional FlushAsync at the end avoids double-flush if the previous flush
// is still in progress (waitForFlush=false skip path).
if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize)
SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position);
// Start next flush when previous is done; _lastFlush is retained for the next
// Grow / Flush to await (via SyncAwaitFlush). No .Forget() needed — calling it
// would consume the ValueTask and risk double-await when the next iteration waits.
if (_lastFlush.IsCompleted)
{
_lastFlush = _pipeWriter.FlushAsync();
}
// Acquire new chunk with header reservation
// Acquire new chunk with header reservation (common to both paths).
AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd);
_currentChunkStart = position;
}
@ -147,8 +192,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// Returns total serialized data bytes (excluding framing overhead).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetTotalPosition(int currentPosition)
=> _committedBytes + (currentPosition - _currentChunkStart);
public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart);
/// <summary>
/// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header.
@ -182,6 +226,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
buffer[headerStart] = ChunkDataMarker;
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes);
EmitDiagnostic($"CommitCurrentChunk: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})");
if (_ownedBuffer)
FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes);
else
@ -206,12 +252,16 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
var totalRequest = dataSize + HeaderSize;
var memory = _pipeWriter.GetMemory(totalRequest);
EmitDiagnostic($"AcquireChunk: requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}");
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null)
{
buffer = segment.Array;
position = segment.Offset + HeaderSize;
bufferEnd = segment.Offset + HeaderSize + dataSize;
_ownedBuffer = false;
EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]");
}
else
{
@ -220,6 +270,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
position = HeaderSize;
bufferEnd = HeaderSize + dataSize;
_ownedBuffer = true;
EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]");
}
}
}