diff --git a/.claude/settings.local.json b/.claude/settings.local.json index bd8114b..acc3b01 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -64,7 +64,8 @@ "PowerShell($pluginRoots = @\\(\"H:\\\\Applications\\\\Mango\\\\Source\\\\FruitBank\\\\Presentation\\\\Nop.Web\\\\Plugins\", \"H:\\\\Applications\\\\Mango\\\\Source\\\\FruitBank\\\\Presentation\\\\Nop.Web\\\\bin\\\\Release\\\\net9.0\\\\Plugins\", \"H:\\\\Applications\\\\Mango\\\\Source\\\\FruitBank\\\\Presentation\\\\Nop.Web\\\\bin\\\\Debug\\\\net9.0\\\\Plugins\"\\); foreach \\($p in $pluginRoots\\) { if \\(Test-Path $p\\) { Write-Output \"=== $p ===\"; Get-ChildItem -Path $p -Recurse -Include \"AyCode.Services.dll\",\"AyCode.Core.dll\",\"Mango.Nop.Core.dll\",\"Nop.Plugin.Misc.FruitBankPlugin.dll\",\"Mango.Nop.Services.dll\" -ErrorAction SilentlyContinue | Select-Object LastWriteTime, Length, FullName | Sort-Object FullName | Format-Table -AutoSize -Wrap } else { Write-Output \"NOT FOUND: $p\" } })", "PowerShell($appDataPaths = @\\(\"H:\\\\Applications\\\\Mango\\\\Source\\\\FruitBank\\\\Presentation\\\\Nop.Web\\\\App_Data\\\\plugins.json\", \"H:\\\\Applications\\\\Mango\\\\Source\\\\FruitBank\\\\Presentation\\\\Nop.Web\\\\App_Data\\\\plugins.installed.json\"\\); foreach \\($f in $appDataPaths\\) { if \\(Test-Path $f\\) { Write-Output \"=== $f ===\"; Get-Content $f -Raw } else { Write-Output \"NOT FOUND: $f\" } })", "Read(//h/Applications/Mango//**)", - "Read(//h/Applications/Mango/LLM_PLAN//**)" + "Read(//h/Applications/Mango/LLM_PLAN//**)", + "Bash(curl -s \"https://raw.githubusercontent.com/dotnet/runtime/main/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs\")" ] } } diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs index 40a8103..774e0ff 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerNamedPipeTests.cs @@ -23,13 +23,13 @@ public class AcBinarySerializerNamedPipeTests { // Unique pipe name per test run to avoid cross-run interference. var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; - // 4096-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a - // StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 4096 + // 256-byte chunk size = Kestrel slab default; the AsyncPipeWriterOutput on a + // StreamPipeWriter (NamedPipe-backed) currently misbehaves on chunkSize < 256 // (ArgumentOutOfRangeException in StreamPipeWriter.Advance — pre-existing latent // issue in AsyncPipeWriterOutput, not introduced here). Tracked separately; this // test uses a known-working chunk size that still exercises framing across // multiple chunks for our 50-item payload. - var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 }; + var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; var original = CreatePayload(50); // Start the receiver first — DeserializeFromNamedPipeAsync's synchronous prefix @@ -46,24 +46,23 @@ public class AcBinarySerializerNamedPipeTests } [TestMethod] - public async Task RoundTrip_LargeScalePayload_ChunkSize4096_StructuralEquality() + public async Task RoundTrip_LargeScalePayload_ChunkSize256_StructuralEquality() { // Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points // = ~3700 deeply-nested objects with shared references (50 tags, 20 users, metadata, 10 categories). - // Serialized size ~few hundred KB → many chunks at chunkSize=4096 → real backpressure-driven streaming + // Serialized size ~few hundred KB → many chunks at chunkSize=256 → real backpressure-driven streaming // (PipeWriter pauseThreshold ~64KB, bytes flow incrementally as consumer drains). #if DEBUG - // Capture receiver-side state-machine trail to diagnose where the failure occurs - // relative to receiver activity. DiagnosticLog is static, so we save/restore around - // the test body to keep tests independent. + // Capture BOTH receiver and sender state to diagnose the StreamPipeWriter interaction. var diagLogs = new List(); - AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add(msg); + AsyncPipeReaderInput.DiagnosticLog = msg => diagLogs.Add($"[R] {msg}"); + AsyncPipeWriterOutput.DiagnosticLog = msg => diagLogs.Add($"[S] {msg}"); #endif try { var pipeName = $"AcBinaryTest-{Guid.NewGuid():N}"; - var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 }; + var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 }; var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100); var receiveTask = AcBinaryDeserializer.DeserializeFromNamedPipeAsync(pipeName, opts); @@ -88,11 +87,12 @@ public class AcBinarySerializerNamedPipeTests { #if DEBUG AsyncPipeReaderInput.DiagnosticLog = null; + AsyncPipeWriterOutput.DiagnosticLog = null; if (diagLogs.Count > 0) { - Console.WriteLine($"=== AsyncPipeReaderInput DiagnosticLog trail ({diagLogs.Count} entries) ==="); - // Print last 50 entries (most relevant to failure point) - var startIdx = Math.Max(0, diagLogs.Count - 50); + Console.WriteLine($"=== Sender [S] + Receiver [R] DiagnosticLog trail ({diagLogs.Count} entries) ==="); + // Print last 60 entries (most relevant to failure point) + var startIdx = Math.Max(0, diagLogs.Count - 60); if (startIdx > 0) Console.WriteLine($" ... ({startIdx} earlier entries elided)"); for (var i = startIdx; i < diagLogs.Count; i++) diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs index 76ebf25..6f5c4ba 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.NamedPipe.cs @@ -36,10 +36,7 @@ public static partial class AcBinaryDeserializer /// (BufferWriterChunkSize × 2). /// Cancellation token. For connect-timeout, pass the token of a /// new CancellationTokenSource(timeout). - public static async Task DeserializeFromNamedPipeAsync( - string pipeName, - AcBinarySerializerOptions? options = null, - CancellationToken ct = default) + public static async Task DeserializeFromNamedPipeAsync(string pipeName, AcBinarySerializerOptions? options = null, CancellationToken ct = default) { if (pipeName is null) throw new ArgumentNullException(nameof(pipeName)); diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs index 379291f..19e6d32 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.NamedPipe.cs @@ -42,12 +42,7 @@ public static partial class AcBinarySerializer /// NamedPipe server host. Defaults to "." (local machine). /// Cancellation token. For connect-timeout, pass the token of a /// new CancellationTokenSource(timeout) — uniform cancellation/timeout pattern. - public static async Task SerializeToNamedPipeAsync( - string pipeName, - T value, - AcBinarySerializerOptions? options = null, - string serverName = ".", - CancellationToken ct = default) + public static async Task SerializeToNamedPipeAsync(string pipeName, T value, AcBinarySerializerOptions? options = null, string serverName = ".", CancellationToken ct = default) { if (pipeName is null) throw new ArgumentNullException(nameof(pipeName)); if (serverName is null) throw new ArgumentNullException(nameof(serverName)); diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 26bf907..1f159dd 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -441,10 +441,7 @@ public static partial class AcBinarySerializer /// on stuck consumers. /// /// Total serialized data bytes (excluding framing overhead). - public static int Serialize( - T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, - bool waitForFlush = true, - TimeSpan? flushTimeout = null) + public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (value == null) { diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs index 6ff5086..06e942a 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs @@ -34,10 +34,7 @@ public static class AsyncPipeReaderInputExtensions /// The pipe reader to drain. /// Optional cancellation token. /// If or is null. - public static async Task DrainFromAsync( - this AsyncPipeReaderInput input, - PipeReader reader, - CancellationToken cancellationToken = default) + public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken cancellationToken = default) { if (input is null) throw new ArgumentNullException(nameof(input)); if (reader is null) throw new ArgumentNullException(nameof(reader)); @@ -48,11 +45,9 @@ public static class AsyncPipeReaderInputExtensions { var result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - foreach (var segment in result.Buffer) - input.Feed(segment.Span); + foreach (var segment in result.Buffer) input.Feed(segment.Span); reader.AdvanceTo(result.Buffer.End); - if (result.IsCompleted) break; } } diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index b54c8b4..685ff94 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -1,6 +1,8 @@ using System; using System.Buffers; using System.Buffers.Binary; +using System.Diagnostics; +using System.IO; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -48,15 +50,36 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// Maximum chunk data size (UINT16 max). public const int MaxChunkSize = ushort.MaxValue; + /// + /// Cached runtime type, discovered via the public + /// factory at class-load + /// time (no magic strings, no reflection lookup, refactor-safe — if MS ever renames the + /// internal type, this auto-tracks). The dummy instance is unreachable after class init + /// and GC-collected; the static field retains only the reference. + /// + private static readonly Type StreamPipeWriterType = PipeWriter.Create(Stream.Null).GetType(); + private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; private readonly bool _waitForFlush; + private readonly bool _serializeFlushAndAcquire; private readonly TimeSpan _flushTimeout; private int _committedBytes; private int _currentChunkStart; private bool _ownedBuffer; private ValueTask _lastFlush; + /// + /// Static diagnostic sink for sender-side state inspection. null by default — set + /// from tests to capture AcquireChunk / CommitCurrentChunk events with full + /// segment + bookkeeping values. is - + /// decorated, so call sites are removed in RELEASE (zero runtime cost). + /// + public static Action? DiagnosticLog; + + [Conditional("DEBUG")] + private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message); + /// Creates an output bound to the given PipeWriter with self-describing chunked framing. /// Target pipe (typically Kestrel's transport output for SignalR). /// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size. @@ -75,6 +98,13 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. _flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan; + // StreamPipeWriter (PipeWriter.Create(Stream)) resets internal _tailMemory to default + // at FlushAsync completion — racing with the AcquireChunk-during-flush parallelism this + // class deliberately uses. For Stream-backed writers, fully await the just-started flush + // before acquiring the next chunk's memory (the writer-correct usage pattern; flush is + // a real I/O operation here). Pipe-based writers (Kestrel transport, SignalR) do NOT + // reset state on flush completion → the parallelism feature stays intact for them. + _serializeFlushAndAcquire = pipeWriter.GetType() == StreamPipeWriterType; _committedBytes = 0; _ownedBuffer = false; _lastFlush = default; @@ -122,23 +152,38 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase [MethodImpl(MethodImplOptions.NoInlining)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) { - // Backpressure: wait for previous flush if still in progress, - // or if committed bytes approach the Pipe's PauseWriterThreshold (~64KB) - // to prevent unbounded memory growth in waitForFlush=false mode. - if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) - SyncAwaitFlush(_lastFlush); - - CommitCurrentChunk(buffer, position); - - // Start next flush when previous is done; _lastFlush is retained for the next - // Grow / Flush to await (via SyncAwaitFlush). No .Forget() needed — calling it - // would consume the ValueTask and risk double-await when the next iteration waits. - if (_lastFlush.IsCompleted) + if (_serializeFlushAndAcquire) { - _lastFlush = _pipeWriter.FlushAsync(); + // STREAMPIPEWRITER path — sequential per chunk: commit → flush → await → acquire. + // Stream-backed writers (NamedPipe / FileStream / NetworkStream) reset internal + // state (_tailMemory) at flush completion → cannot acquire-during-flush concurrently + // (the standard Stream-PipeWriter usage pattern is await-flush-before-next-write). + // waitForFlush / _committedBytes throttling don't apply here — the writer pattern + // enforces sequentiality intrinsically. + CommitCurrentChunk(buffer, position); + SyncAwaitFlush(_pipeWriter.FlushAsync()); + } + else + { + // PIPE-BASED path (Kestrel / SignalR) — parallel sender: serializer writes the next + // chunk into the PipeWriter's buffer concurrently with the background FlushAsync. + // waitForFlush=true: backpressure — wait for the previous parallel flush before + // starting a new one (prevents unbounded in-flight flushes). + // waitForFlush=false: adaptive — skip the wait, but force-await if _committedBytes + // approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer + // growth when the consumer is slow. + // The conditional FlushAsync at the end avoids double-flush if the previous flush + // is still in progress (waitForFlush=false skip path). + if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) + SyncAwaitFlush(_lastFlush); + + CommitCurrentChunk(buffer, position); + + if (_lastFlush.IsCompleted) + _lastFlush = _pipeWriter.FlushAsync(); } - // Acquire new chunk with header reservation + // Acquire new chunk with header reservation (common to both paths). AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd); _currentChunkStart = position; } @@ -147,8 +192,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// Returns total serialized data bytes (excluding framing overhead). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public int GetTotalPosition(int currentPosition) - => _committedBytes + (currentPosition - _currentChunkStart); + public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart); /// /// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header. @@ -182,6 +226,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase buffer[headerStart] = ChunkDataMarker; BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); + EmitDiagnostic($"CommitCurrentChunk: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); else @@ -206,12 +252,16 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase var totalRequest = dataSize + HeaderSize; var memory = _pipeWriter.GetMemory(totalRequest); + EmitDiagnostic($"AcquireChunk: requestSize={requestSize} dataSize={dataSize} totalRequest={totalRequest} memory.Length={memory.Length} _committedBytes={_committedBytes}"); + if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) { buffer = segment.Array; position = segment.Offset + HeaderSize; bufferEnd = segment.Offset + HeaderSize + dataSize; _ownedBuffer = false; + + EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); } else { @@ -220,6 +270,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase position = HeaderSize; bufferEnd = HeaderSize + dataSize; _ownedBuffer = true; + + EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]"); } } }