[LOADED_DOCS: 3 files, no new loads]
Remove SegmentBufferReader; unify on AsyncPipeReaderInput Migrates all SignalR chunked streaming receive logic to AsyncPipeReaderInput, fully removing SegmentBufferReader and SegmentBufferReaderInput from the codebase. Updates all references, deserialization paths, and documentation to reflect the new unified primitive. Marks ADR-0003 as accepted (partially executed), closes related TODOs, and clarifies protocol docs. Sets DoubleBuffered as the default FlushPolicy. No wire format or behavioral changes; all tests pass.
This commit is contained in:
parent
e7b12a1100
commit
97ac3e21a3
|
|
@ -18,7 +18,7 @@ This skill READS `.md` files and updates the LLM's `[LOADED_DOCS: ...]` state. I
|
|||
|
||||
Parse the user's most recent message (and the wider conversation tail if relevant) for concrete concepts. Examples:
|
||||
|
||||
- Class / type names: `AcLoggerBase`, `SegmentBufferReader`, `AcBinaryHubProtocol`, `<Consumer>SignalRClient` (any derived/consumer-specific type)
|
||||
- Class / type names: `AcLoggerBase`, `AsyncPipeReaderInput`, `AcBinaryHubProtocol`, `<Consumer>SignalRClient` (any derived/consumer-specific type)
|
||||
- Feature areas: "logger", "log writer", "serializer", "SignalR", "hub protocol", "chunked framing", "connection builder", "options"
|
||||
- File hints: `Program.cs`, `AcLoggerBase.cs`, `SIGNALR.md`
|
||||
- Patterns / idioms: "DI factory", "appsettings", "mode negotiation"
|
||||
|
|
|
|||
|
|
@ -16,6 +16,17 @@
|
|||
<TargetFramework>net9.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
|
||||
<!-- NativeAOT enable -->
|
||||
<PublishAot>true</PublishAot>
|
||||
<InvariantGlobalization>true</InvariantGlobalization>
|
||||
|
||||
<!-- Először tegyük zsongva: nyeljük le a trim warning-okat hogy buildelni tudjon. -->
|
||||
<!-- Ezt később vissza lehet kapcsolni szigorúra. -->
|
||||
<SuppressTrimAnalysisWarnings>true</SuppressTrimAnalysisWarnings>
|
||||
<TrimmerSingleWarn>false</TrimmerSingleWarn>
|
||||
|
||||
<JsonSerializerIsReflectionEnabledByDefault>true</JsonSerializerIsReflectionEnabledByDefault>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
|||
|
|
@ -557,7 +557,9 @@ public static class Program
|
|||
new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"),
|
||||
// Fastest Byte[] — Runtime path (UseGeneratedCode=false). Same wire/options, no source-generated dispatch.
|
||||
// Always paired with the SGen variant so every layer can compare the SGen speed-up apples-to-apples.
|
||||
new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"),
|
||||
// COMMENTED: Reflection.Emit-based dispatch crashes under NativeAOT (PlatformNotSupportedException).
|
||||
// Re-enable for JIT-mode benchmarks where SGen-vs-Runtime delta matters.
|
||||
//new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"),
|
||||
// Default preset Byte[] — RefHandling=OnlyId (deduplicates IId-shared references on the wire) +
|
||||
// UseStringInterning=All (deduplicates repeated strings). Showcases the Default preset's wire-size
|
||||
// and CPU trade-off vs FastMode on the ~20% IId-ref / repeated-string test data.
|
||||
|
|
|
|||
|
|
@ -284,15 +284,6 @@ public static partial class AcBinaryDeserializer
|
|||
return DeserializeSequence(new SequenceBinaryInput(data), targetType, options);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Deserialize from a <see cref="SegmentBufferReader"/> with streaming pipeline parallelism.
|
||||
/// The producer thread writes chunk data via <see cref="SegmentBufferReader.Write"/>,
|
||||
/// while this method (running on a background thread) deserializes incrementally,
|
||||
/// blocking on <see cref="System.Threading.ManualResetEventSlim"/> when data is exhausted.
|
||||
/// </summary>
|
||||
public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options)
|
||||
=> DeserializeSequence(new SegmentBufferReaderInput(reader), targetType, options);
|
||||
|
||||
/// <summary>
|
||||
/// Deserialize from an <see cref="AsyncPipeReaderInput"/> with streaming pipeline parallelism.
|
||||
/// The producer thread feeds chunk data via <see cref="AsyncPipeReaderInput.Feed"/>,
|
||||
|
|
|
|||
|
|
@ -555,13 +555,27 @@ public static partial class AcBinarySerializer
|
|||
{
|
||||
if (value == null)
|
||||
{
|
||||
// Null: write directly, no chunking needed
|
||||
if (!multiMessage)
|
||||
{
|
||||
// Raw single-message mode: null is just a [BinaryTypeCode.Null] byte on the wire.
|
||||
// No chunking needed, no [201]/[202] framing.
|
||||
var span = pipeWriter.GetSpan(1);
|
||||
span[0] = BinaryTypeCode.Null;
|
||||
pipeWriter.Advance(1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Framed mode (multiMessage=true): null still needs to flow through AsyncPipeWriterOutput
|
||||
// so the wire is [201][UINT16=1][BinaryTypeCode.Null][202] — well-formed chunked frame the
|
||||
// receiver can parse. Bypassing AsyncPipeWriterOutput here would emit a bare [Null] byte that
|
||||
// breaks framing for any chunked-stream consumer (e.g. AcBinaryHubProtocol AsyncSegment).
|
||||
var nullOutput = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, multiMessage: true, flushPolicy, flushTimeout);
|
||||
nullOutput.Initialize(out var nullBuf, out var nullPos, out _);
|
||||
nullBuf[nullPos++] = BinaryTypeCode.Null;
|
||||
nullOutput.Flush(nullBuf, nullPos);
|
||||
return 1;
|
||||
}
|
||||
|
||||
var runtimeType = value.GetType();
|
||||
var context = BinarySerializationContextPool<AsyncPipeWriterOutput>.Get(options);
|
||||
|
||||
|
|
|
|||
|
|
@ -6,10 +6,9 @@ namespace AyCode.Core.Serializers.Binaries;
|
|||
|
||||
/// <summary>
|
||||
/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
|
||||
///
|
||||
/// Self-contained <see cref="IBinaryInputBase"/> implementation that consolidates the legacy
|
||||
/// <c>SegmentBufferReader</c> + <c>SegmentBufferReaderInput</c> pair into a single sealed class
|
||||
/// (see ADR-0003 at <c>docs/adr/0003-acbinary-streaming-receive-architecture.md</c>).
|
||||
/// Self-contained <see cref="IBinaryInputBase"/> implementation — universal receive-side primitive
|
||||
/// for the AcBinary streaming framework (see ADR-0003 at
|
||||
/// <c>docs/adr/0003-acbinary-streaming-receive-architecture.md</c>).
|
||||
///
|
||||
/// The naming mirrors the send-side <c>AsyncPipeWriterOutput</c> primitive — both follow the
|
||||
/// .NET BCL convention for type-level <c>Async</c> prefix (<c>AsyncEnumerable</c>,
|
||||
|
|
|
|||
|
|
@ -29,9 +29,9 @@ namespace AyCode.Core.Serializers.Binaries;
|
|||
/// <para><b>Why not relax the deserializer's struct constraint?</b> 16+ generic declarations
|
||||
/// rely on it (<c>DeserializeSequence</c>, <c>TypeReaderTable</c>,
|
||||
/// <c>DeserializationContextPool</c> chain). Relaxing would require a wide refactor with risk
|
||||
/// to the existing perf-critical struct paths (<c>SegmentBufferReaderInput</c>,
|
||||
/// <c>ArrayBinaryInput</c>, <c>SequenceBinaryInput</c>). The ~2 ns per call savings is
|
||||
/// sub-picosecond per byte at typical chunk sizes — not worth the blast radius.</para>
|
||||
/// to the existing perf-critical struct paths (<c>ArrayBinaryInput</c>,
|
||||
/// <c>SequenceBinaryInput</c>). The ~2 ns per call savings is sub-picosecond per byte at
|
||||
/// typical chunk sizes — not worth the blast radius.</para>
|
||||
/// </summary>
|
||||
internal readonly struct AsyncPipeReaderInputAdapter : IBinaryInputBase
|
||||
{
|
||||
|
|
|
|||
|
|
@ -40,11 +40,13 @@ namespace AyCode.Core.Serializers.Binaries;
|
|||
/// Grow waits only if previous flush hasn't completed. <b>Peak memory:</b> ~chunk_size × 2.
|
||||
/// <b>Pro:</b> max producer/flush parallelism with bounded memory.
|
||||
/// <b>Con:</b> slow consumer blocks producer at next Grow (bounded by <c>flushTimeout</c>).</item>
|
||||
/// <item><see cref="FlushPolicy.Coalesced"/>: Grow() never awaits; Pipe coalesces flushes up to
|
||||
/// <c>PauseWriterThreshold</c> (~64 KB). <b>Peak memory:</b> up to PauseWriterThreshold.
|
||||
/// <b>Pro:</b> highest throughput on bounded payloads.
|
||||
/// <b>Con:</b> peak memory unbounded by chunk_size; under heavy backpressure may fall back to
|
||||
/// an owned buffer, losing zero-copy for that chunk.</item>
|
||||
/// <item><see cref="FlushPolicy.Coalesced"/>: chunks accumulate while a flush is in-flight; on
|
||||
/// the per-window safety threshold (~64 KB), the producer waits for the prior flush, then fires
|
||||
/// one batched flush covering the whole window and resets the window-counter. <b>Peak memory:</b>
|
||||
/// ~64 KB per window. <b>Pro:</b> dramatically fewer FlushAsync syscalls (one per ~64 KB window
|
||||
/// instead of one per chunk) → major throughput win on transports with non-trivial flush overhead.
|
||||
/// <b>Con:</b> per-window peak ~64 KB; under heavy backpressure may fall back to an owned buffer,
|
||||
/// losing zero-copy for that chunk.</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para><b>Flush strategy</b> auto-selects on writer type — Stream-backed PipeWriters
|
||||
|
|
@ -119,7 +121,15 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
|
|||
private readonly FlushPolicy _flushPolicy;
|
||||
private readonly bool _serializeFlushAndAcquire;
|
||||
private readonly TimeSpan _flushTimeout;
|
||||
// Cumulative total of data bytes committed since Initialize. Used by GetTotalPosition for the
|
||||
// Position property and the Serialize* methods' return value. Never resets mid-Serialize.
|
||||
private int _committedBytes;
|
||||
// Window-local counter of data bytes committed since the last FlushAsync was fired (Coalesced
|
||||
// mode safety-net). Resets when a new FlushAsync starts, so each flush window starts from 0
|
||||
// and the safety-net trip-point (~64 KB) bounds peak in-flight buffer per window — not the
|
||||
// total payload size. Without this, Coalesced would degrade to per-chunk-sync behavior after
|
||||
// the first ~15 chunks of any multi-MB payload.
|
||||
private int _unflushedBytes;
|
||||
private int _currentChunkStart;
|
||||
// Whether the current chunk's buffer is the owned ArrayPool fallback (true) or a zero-copy
|
||||
// PipeWriter slab (false). Used by CommitCurrentChunk to pick the commit strategy.
|
||||
|
|
@ -180,6 +190,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
|
|||
_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter);
|
||||
|
||||
_committedBytes = 0;
|
||||
_unflushedBytes = 0;
|
||||
_hasOwnedBuffer = false;
|
||||
_ownedBuffer = null;
|
||||
_lastFlush = default;
|
||||
|
|
@ -215,6 +226,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
|
|||
public void Initialize(out byte[] buffer, out int position, out int bufferEnd)
|
||||
{
|
||||
_committedBytes = 0;
|
||||
_unflushedBytes = 0;
|
||||
_lastFlush = default;
|
||||
|
||||
AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd);
|
||||
|
|
@ -247,16 +259,28 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
|
|||
// the next chunk into the PipeWriter's buffer concurrently with the background FlushAsync.
|
||||
// FlushPolicy.DoubleBuffered: wait at next Grow if the previous parallel flush hasn't
|
||||
// completed yet → max two chunks in flight (~chunk_size × 2 peak memory).
|
||||
// FlushPolicy.Coalesced: skip the per-chunk wait; only block when _committedBytes
|
||||
// approaches the Pipe's PauseWriterThreshold (~64 KB), preventing runaway buffer
|
||||
// growth under a slow consumer.
|
||||
// FlushPolicy.Coalesced: skip the per-chunk wait — bytes accumulate in the PipeWriter's
|
||||
// buffer until _unflushedBytes approaches the Pipe's PauseWriterThreshold (~64 KB).
|
||||
// At that point we wait for the prior flush, then fire ONE flush that batches the
|
||||
// entire window. After firing, _unflushedBytes resets so the next window can begin.
|
||||
// This produces ~64 KB-sized flush windows instead of per-chunk flushes — fewer
|
||||
// syscalls, better network throughput.
|
||||
// The conditional FlushAsync at the end avoids double-flush if the previous flush
|
||||
// is still in progress (Coalesced skip path).
|
||||
if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush);
|
||||
// is still in progress (Coalesced skip path — keep accumulating).
|
||||
if ((_flushPolicy == FlushPolicy.DoubleBuffered && !_lastFlush.IsCompleted) || _unflushedBytes > MaxChunkSize - _chunkSize) SyncAwaitFlush(_lastFlush);
|
||||
|
||||
CommitCurrentChunk(buffer, position);
|
||||
|
||||
if (_lastFlush.IsCompleted) _lastFlush = _pipeWriter.FlushAsync();
|
||||
if (_lastFlush.IsCompleted)
|
||||
{
|
||||
// The accumulated unflushed bytes are about to be moved into this flush — reset the
|
||||
// window-counter so the next flush window starts fresh. Without this reset, the
|
||||
// safety-net would trip permanently after the first ~15 chunks of a multi-MB payload,
|
||||
// degrading Coalesced to per-chunk-sync behavior. _committedBytes (cumulative total
|
||||
// for GetTotalPosition) is intentionally unchanged — only the window-counter resets.
|
||||
_unflushedBytes = 0;
|
||||
_lastFlush = _pipeWriter.FlushAsync();
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire new chunk with header reservation (common to both paths).
|
||||
|
|
@ -378,7 +402,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
|
|||
else _pipeWriter.Advance(dataBytes);
|
||||
}
|
||||
|
||||
_committedBytes += dataBytes; // only count data bytes, not framing
|
||||
// Both counters track only data bytes (not framing). _committedBytes is the cumulative
|
||||
// total used by GetTotalPosition; _unflushedBytes is the per-window counter used by the
|
||||
// Coalesced safety-net (resets each time a new FlushAsync starts).
|
||||
_committedBytes += dataBytes;
|
||||
_unflushedBytes += dataBytes;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
|||
|
|
@ -44,19 +44,24 @@ public enum FlushPolicy
|
|||
DoubleBuffered,
|
||||
|
||||
/// <summary>
|
||||
/// Coalesced — producer never awaits per-chunk flushes. The underlying <c>Pipe</c> coalesces
|
||||
/// flushes adaptively up to its <c>PauseWriterThreshold</c> (~64 KB committed bytes by default).
|
||||
/// Producer continues serializing in parallel with the consumer's drain; if the consumer
|
||||
/// catches up earlier, the pipe flushes sooner and the producer keeps going without waiting.
|
||||
/// <para><b>Peak memory:</b> grows up to <c>PauseWriterThreshold</c> (~64 KB) under slow
|
||||
/// consumer; close to chunk_size × 2 under fast consumer.</para>
|
||||
/// <para><b>Pro:</b> highest throughput on bounded payloads; never blocks for fast consumers;
|
||||
/// pipe-managed adaptive backpressure kicks in only when actually needed.</para>
|
||||
/// <para><b>Con:</b> peak memory unbounded by chunk_size — grows to PauseWriterThreshold under
|
||||
/// slow-consumer conditions; under heavy backpressure may fall back to an owned buffer
|
||||
/// Coalesced — producer batches up to ~64 KB worth of chunks into a single flush window.
|
||||
/// While a previous <c>FlushAsync</c> is in-flight, new chunks accumulate in the
|
||||
/// <see cref="System.IO.Pipelines.PipeWriter"/>'s buffer without firing additional flushes.
|
||||
/// When the per-window counter approaches the safety threshold (~64 KB), the producer waits
|
||||
/// for the in-flight flush to complete, then fires <b>one</b> batched flush covering the
|
||||
/// entire window. After firing, the window-counter resets and the next window begins.
|
||||
/// <para><b>Peak memory:</b> bounded per-window by ~64 KB (the in-flight chunks during one
|
||||
/// flush window). Total peak in process ≤ chunk_size × 2 + ~64 KB pipe-internal accumulation.</para>
|
||||
/// <para><b>Pro:</b> dramatically fewer FlushAsync syscalls compared to per-chunk modes —
|
||||
/// e.g. a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per ~64 KB window) instead of
|
||||
/// ~2 300 flushes (one per chunk). Major throughput win on transports where each FlushAsync
|
||||
/// has non-trivial overhead (network sockets, Kestrel WebSocket, kernel TCP buffers).</para>
|
||||
/// <para><b>Con:</b> per-window peak memory ~64 KB (vs chunk_size × 2 in
|
||||
/// <see cref="DoubleBuffered"/>); under heavy backpressure may fall back to an owned buffer
|
||||
/// (losing zero-copy for that chunk).</para>
|
||||
/// <para>Recommended when payload size is known and bounded (REST request/response, fixed-size
|
||||
/// IPC message), and the consumer is reliably fast.</para>
|
||||
/// IPC message), large enough to span multiple flush windows, and the consumer is reliably
|
||||
/// fast enough to drain the pipe between windows.</para>
|
||||
/// </summary>
|
||||
Coalesced
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,228 +0,0 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace AyCode.Core.Serializers.Binaries;
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
|
||||
///
|
||||
/// Replaces <see cref="System.IO.Pipelines.Pipe"/> for the AsyncSegment read path:
|
||||
/// the main thread writes incoming chunk data via <see cref="Write"/>, while a background
|
||||
/// deserialization thread reads through <see cref="SegmentBufferReaderInput"/> which blocks
|
||||
/// on <see cref="ManualResetEventSlim"/> when data is exhausted.
|
||||
///
|
||||
/// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>.
|
||||
/// Positions reset to 0 when the consumer catches up (zero-cost reuse).
|
||||
/// Grow is the absolute last resort (single Rent, practically never happens).
|
||||
///
|
||||
/// Thread-safety:
|
||||
/// <list type="bullet">
|
||||
/// <item><c>_writePos</c>: written by producer (Volatile.Write), read by consumer (Volatile.Read).</item>
|
||||
/// <item><c>_readPos</c>: written by consumer (Volatile.Write), read by producer (Volatile.Read).</item>
|
||||
/// <item>Reset-to-0 happens in <see cref="Write"/> only when <c>_readPos == _writePos</c>
|
||||
/// (consumer is blocked in TryAdvanceSegment, not reading the buffer).</item>
|
||||
/// <item>Grow happens in <see cref="Write"/> only when reset is insufficient
|
||||
/// (consumer is behind). Old buffer kept for consumer's local reference;
|
||||
/// <see cref="SegmentBufferReaderInput.TryAdvanceSegment"/> picks up new buffer.</item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
public sealed class SegmentBufferReader : IDisposable
|
||||
{
|
||||
private byte[] _buffer;
|
||||
private int _writePos;
|
||||
private int _readPos; // consumer reports consumed position here
|
||||
private bool _completed;
|
||||
|
||||
private readonly ManualResetEventSlim _dataAvailable;
|
||||
private readonly ILogger? _logger;
|
||||
|
||||
// After grow: ALL old buffers are kept alive until Dispose.
|
||||
// Cannot return them to the pool mid-operation because the consumer thread
|
||||
// may hold a local reference to any of them (its local 'buffer' variable is
|
||||
// only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind).
|
||||
private byte[][]? _oldBuffers;
|
||||
private int _oldBufferCount;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new SegmentBufferReader with the specified initial capacity.
|
||||
/// Recommended: <c>options.BufferWriterChunkSize * 2</c> (e.g. 8 KB for the SignalR-context 4 KB chunk size,
|
||||
/// 128 KB for the standalone 64 KB default). See <see cref="SegmentBufferReaderInput"/> class remarks
|
||||
/// for the full sizing rationale (two-chunks-worth headroom + reset-to-0 cycling).
|
||||
/// </summary>
|
||||
/// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param>
|
||||
/// <param name="logger">Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds.</param>
|
||||
public SegmentBufferReader(int initialCapacity, ILogger? logger = null)
|
||||
{
|
||||
if (initialCapacity <= 0)
|
||||
throw new ArgumentOutOfRangeException(nameof(initialCapacity));
|
||||
|
||||
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
|
||||
_dataAvailable = new ManualResetEventSlim(false);
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
// --- Producer API (main thread) ---
|
||||
|
||||
/// <summary>
|
||||
/// Appends chunk data to the buffer.
|
||||
/// Resets positions to 0 when consumer has caught up (zero-cost).
|
||||
/// Grows as last resort if consumer is behind and buffer is full.
|
||||
/// Signals the consumer thread that new data is available.
|
||||
/// </summary>
|
||||
public void Write(ReadOnlySpan<byte> data)
|
||||
{
|
||||
if (data.IsEmpty) return;
|
||||
|
||||
// If consumer consumed everything → reset positions to 0 (zero-cost reuse)
|
||||
var rp = Volatile.Read(ref _readPos);
|
||||
if (rp > 0 && rp == _writePos)
|
||||
{
|
||||
DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos);
|
||||
_writePos = 0;
|
||||
Volatile.Write(ref _readPos, 0);
|
||||
}
|
||||
|
||||
// Grow if buffer can't fit the new data (rare — consumer typically keeps pace)
|
||||
if (_writePos + data.Length > _buffer.Length)
|
||||
{
|
||||
DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}",
|
||||
_writePos, data.Length, _buffer.Length);
|
||||
Grow(_writePos + data.Length);
|
||||
}
|
||||
|
||||
data.CopyTo(_buffer.AsSpan(_writePos));
|
||||
var newWritePos = _writePos + data.Length;
|
||||
Volatile.Write(ref _writePos, newWritePos);
|
||||
_dataAvailable.Set();
|
||||
|
||||
DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}",
|
||||
data.Length, newWritePos, Volatile.Read(ref _readPos));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signals that no more data will be written (CHUNK_END received).
|
||||
/// The consumer's <see cref="SegmentBufferReaderInput.TryAdvanceSegment"/> will return false
|
||||
/// once all buffered data is consumed.
|
||||
/// </summary>
|
||||
public void Complete()
|
||||
{
|
||||
Volatile.Write(ref _completed, true);
|
||||
_dataAvailable.Set();
|
||||
|
||||
DebugLog("Complete writePos={Wp} readPos={Rp}",
|
||||
Volatile.Read(ref _writePos), Volatile.Read(ref _readPos));
|
||||
}
|
||||
|
||||
// --- Consumer API (deser thread, used by SegmentBufferReaderInput) ---
|
||||
|
||||
/// <summary>Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment.</summary>
|
||||
internal byte[] Buffer => _buffer;
|
||||
|
||||
/// <summary>Current write position. All bytes in [ReadPos..WritePos) are valid.</summary>
|
||||
internal int WritePos => Volatile.Read(ref _writePos);
|
||||
|
||||
/// <summary>Consumer's last reported read position.</summary>
|
||||
internal int ReadPos => Volatile.Read(ref _readPos);
|
||||
|
||||
/// <summary>True after <see cref="Complete"/> is called.</summary>
|
||||
internal bool IsCompleted => Volatile.Read(ref _completed);
|
||||
|
||||
/// <summary>
|
||||
/// Called by consumer to report how far it has read.
|
||||
/// Enables the producer to reset positions to 0 when everything is consumed.
|
||||
/// </summary>
|
||||
internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position);
|
||||
|
||||
/// <summary>Blocks until new data is written or <see cref="Complete"/> is called.</summary>
|
||||
internal void WaitForData() => _dataAvailable.Wait();
|
||||
|
||||
/// <summary>
|
||||
/// Resets the signal for the double-check pattern:
|
||||
/// <c>ResetSignal() → check condition → if false, WaitForData()</c>.
|
||||
/// </summary>
|
||||
internal void ResetSignal() => _dataAvailable.Reset();
|
||||
|
||||
// --- Lifecycle ---
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
// Return all old buffers accumulated from grows
|
||||
if (_oldBuffers != null)
|
||||
{
|
||||
for (var i = 0; i < _oldBufferCount; i++)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(_oldBuffers[i]);
|
||||
_oldBuffers[i] = null!;
|
||||
}
|
||||
_oldBuffers = null;
|
||||
_oldBufferCount = 0;
|
||||
}
|
||||
|
||||
// Return current buffer
|
||||
if (_buffer != null!)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(_buffer);
|
||||
_buffer = null!;
|
||||
}
|
||||
|
||||
_dataAvailable.Dispose();
|
||||
}
|
||||
|
||||
// --- Internal ---
|
||||
|
||||
private void Grow(int requiredCapacity)
|
||||
{
|
||||
var newSize = Math.Max(_buffer.Length * 2, requiredCapacity);
|
||||
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
|
||||
System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos);
|
||||
|
||||
// Keep the current buffer alive — consumer's local 'buffer' variable may still reference it
|
||||
// (consumer may lag multiple grows behind before calling TryAdvanceSegment).
|
||||
// Returning old buffers to the pool mid-operation would cause use-after-free
|
||||
// if another pool user overwrites them while the consumer is still reading.
|
||||
if (_oldBuffers == null)
|
||||
_oldBuffers = new byte[4][];
|
||||
else if (_oldBufferCount == _oldBuffers.Length)
|
||||
Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2);
|
||||
|
||||
_oldBuffers[_oldBufferCount++] = _buffer;
|
||||
_buffer = newBuffer;
|
||||
}
|
||||
|
||||
// --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) ---
|
||||
|
||||
/// <summary>
|
||||
/// Emits a debug log if the logger is attached and Debug level is enabled.
|
||||
/// Compiled out entirely in RELEASE builds via <see cref="ConditionalAttribute"/>.
|
||||
/// </summary>
|
||||
[Conditional("DEBUG")]
|
||||
internal void DebugLog(string message)
|
||||
{
|
||||
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
|
||||
_logger.LogDebug("SegmentBufferReader " + message);
|
||||
}
|
||||
|
||||
[Conditional("DEBUG")]
|
||||
private void DebugLog(string template, object? arg0)
|
||||
{
|
||||
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
|
||||
_logger.LogDebug("SegmentBufferReader " + template, arg0);
|
||||
}
|
||||
|
||||
[Conditional("DEBUG")]
|
||||
private void DebugLog(string template, object? arg0, object? arg1)
|
||||
{
|
||||
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
|
||||
_logger.LogDebug("SegmentBufferReader " + template, arg0, arg1);
|
||||
}
|
||||
|
||||
[Conditional("DEBUG")]
|
||||
private void DebugLog(string template, object? arg0, object? arg1, object? arg2)
|
||||
{
|
||||
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
|
||||
_logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
using System.Diagnostics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
|
||||
namespace AyCode.Core.Serializers.Binaries;
|
||||
|
||||
/// <summary>
|
||||
/// Binary input that reads from a <see cref="SegmentBufferReader"/> for chunked streaming deserialization.
|
||||
///
|
||||
/// Replaces <c>PipeReaderBinaryInput</c>: instead of blocking on <c>PipeReader.ReadAsync()</c>,
|
||||
/// blocks on <see cref="ManualResetEventSlim"/> when data is exhausted. Much simpler because
|
||||
/// the buffer is a single contiguous <c>byte[]</c> — no multi-segment iteration, no cross-boundary
|
||||
/// scratch buffers.
|
||||
///
|
||||
/// The deserialization context's hot path reads directly from the buffer array using local
|
||||
/// <c>buffer</c>/<c>position</c>/<c>bufferLength</c> variables. <see cref="TryAdvanceSegment"/>
|
||||
/// is only called when <c>position >= bufferLength</c> (cold path), at which point it reports
|
||||
/// the consumed position via <see cref="SegmentBufferReader.SetReadPos"/>, then either
|
||||
/// provides more data or blocks until data arrives.
|
||||
///
|
||||
/// Position reset: when the producer detects <c>readPos == writePos</c> (all consumed),
|
||||
/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions.
|
||||
///
|
||||
/// <para>
|
||||
/// <b>Recommended <see cref="SegmentBufferReader"/> <c>initialCapacity</c></b>:
|
||||
/// <c>options.BufferWriterChunkSize * 2</c> (typically 8 KB for the SignalR-context 4 KB chunk size,
|
||||
/// 128 KB for the standalone 64 KB default). Sized to fit two chunks worth of in-flight bytes —
|
||||
/// enough headroom for the producer to write the next chunk while the consumer is reading the
|
||||
/// previous, with reset-to-0 cycling reusing the same buffer for the message's lifetime regardless
|
||||
/// of total payload size. Larger values waste memory; smaller values trigger occasional grows
|
||||
/// under burst-write conditions.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public struct SegmentBufferReaderInput : IBinaryInputBase
|
||||
{
|
||||
private readonly SegmentBufferReader _reader;
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public SegmentBufferReaderInput(SegmentBufferReader reader)
|
||||
{
|
||||
_reader = reader;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Provides the initial buffer state. Called once before deserialization begins.
|
||||
/// Task.Run starts after the first Write() (lazy start in TryParseChunkData),
|
||||
/// so data is already available — no wait needed.
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
|
||||
{
|
||||
buffer = _reader.Buffer;
|
||||
position = 0;
|
||||
bufferLength = _reader.WritePos;
|
||||
DebugLog("Input.Initialize bufferLength=" + bufferLength);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Called when the deserialization context needs more bytes than currently available.
|
||||
/// Reports consumed position to the producer, then blocks via <see cref="ManualResetEventSlim"/>
|
||||
/// until enough data arrives or completion is signaled.
|
||||
///
|
||||
/// Uses the double-check pattern to avoid missed signals:
|
||||
/// <c>Reset() → check → if still not enough, Wait()</c>.
|
||||
///
|
||||
/// No cross-boundary handling needed — the buffer is a single contiguous <c>byte[]</c>.
|
||||
/// After grow, re-reads <c>_reader.Buffer</c> to get the new (larger) array.
|
||||
/// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions.
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.NoInlining)]
|
||||
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
|
||||
{
|
||||
DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed);
|
||||
|
||||
// Report how far we've consumed — enables producer to reset positions to 0
|
||||
_reader.SetReadPos(position);
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Re-read positions (may have been reset to 0 by producer)
|
||||
int rp = _reader.ReadPos;
|
||||
int wp = _reader.WritePos;
|
||||
|
||||
if (wp - rp >= needed)
|
||||
{
|
||||
buffer = _reader.Buffer; // may be new array after grow
|
||||
position = rp; // may be 0 after reset
|
||||
bufferLength = wp;
|
||||
DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (_reader.IsCompleted)
|
||||
{
|
||||
// No more data will arrive. Return whatever is left.
|
||||
if (wp > rp)
|
||||
{
|
||||
buffer = _reader.Buffer;
|
||||
position = rp;
|
||||
bufferLength = wp;
|
||||
DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength);
|
||||
return true;
|
||||
}
|
||||
DebugLog("Input.TryAdvanceSegment return false (completed, empty)");
|
||||
return false; // end of input
|
||||
}
|
||||
|
||||
// Double-check pattern: Reset → verify → Wait
|
||||
_reader.ResetSignal();
|
||||
|
||||
rp = _reader.ReadPos;
|
||||
wp = _reader.WritePos;
|
||||
if (wp - rp >= needed || _reader.IsCompleted)
|
||||
continue; // re-check from top
|
||||
|
||||
DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")");
|
||||
_reader.WaitForData(); // ManualResetEventSlim.Wait()
|
||||
DebugLog("Input.TryAdvanceSegment woke up");
|
||||
}
|
||||
}
|
||||
|
||||
[Conditional("DEBUG")]
|
||||
private void DebugLog(string message) => _reader.DebugLog(message);
|
||||
|
||||
/// <summary>
|
||||
/// No-op. Buffer lifecycle is managed by <see cref="SegmentBufferReader.Dispose"/>.
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public void Release() { }
|
||||
}
|
||||
|
|
@ -203,7 +203,7 @@ Once registered, controllers using `[FromBody] T model` and `IActionResult` auto
|
|||
|
||||
- **InputFormatter shape**: `AcBinaryInputFormatter : InputFormatter` with `SupportedMediaTypes.Add("application/x-acbinary")`. The `ReadRequestBodyAsync` reads from `context.HttpContext.Request.BodyReader` (PipeReader) — either drains to byte[] for simple cases (size-bounded by `[Request].MaxAllowedSize`), or uses the `AsyncPipeReaderInput` + drain-task pattern for low-memory streaming on huge payloads. Decide which is the default; expose the streaming variant as opt-in.
|
||||
|
||||
- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning for higher throughput at the cost of owned-buffer fallback risk.
|
||||
- **OutputFormatter shape**: `AcBinaryOutputFormatter : OutputFormatter` writing to `context.HttpContext.Response.BodyWriter` (PipeWriter). The `WriteResponseBodyAsync` calls `AcBinarySerializer.SerializeChunked(value, pipeWriter, options)` (raw — single-message-per-request, no [201]/[202] framing needed). Optional `FlushPolicy.Coalesced` tuning batches flushes into ~64 KB windows for higher throughput, at the cost of owned-buffer fallback risk under heavy backpressure.
|
||||
|
||||
- **Wire-format choice**: raw chunked stream (`SerializeChunked`) is the natural fit for HTTP single-request-single-response. The multi-message framed variant (`SerializeChunkedFramed`) is over-engineered for REST — there's no concept of "next message on this stream" within a single HTTP request. Document this choice clearly.
|
||||
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
|
@ -24,12 +24,9 @@ public static class AcSignalRServerProtocolExtensions
|
|||
/// <item>The optional <paramref name="configure"/> callback — overrides DI values inline</item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
public static ISignalRServerBuilder AddAcBinaryProtocol(
|
||||
this ISignalRServerBuilder builder,
|
||||
Action<AcBinaryHubProtocolOptions>? configure = null)
|
||||
public static ISignalRServerBuilder AddAcBinaryProtocol(this ISignalRServerBuilder builder, Action<AcBinaryHubProtocolOptions>? configure = null)
|
||||
{
|
||||
builder.Services.AddSingleton<IHubProtocol>(sp =>
|
||||
AcSignalRProtocolExtensions.BuildProtocol(sp, configure));
|
||||
builder.Services.AddSingleton<IHubProtocol>(sp => AcSignalRProtocolExtensions.BuildProtocol(sp, configure));
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,9 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
|
|||
public override async Task OnConnectedAsync()
|
||||
{
|
||||
InitDiagnosticLoggerIfNeeded();
|
||||
|
||||
Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}");
|
||||
|
||||
LogContextUserNameAndId();
|
||||
await base.OnConnectedAsync();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,9 +64,11 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
/// <list type="bullet">
|
||||
/// <item>Send path: <c>AsyncSegment</c> is unsupported (sync-over-async flush blocks the single UI thread).</item>
|
||||
/// <item>Receive path: when chunked wire arrives, background <c>Task.Run</c> is skipped;
|
||||
/// the deserializer runs synchronously on <c>CHUNK_END</c> over the already-buffered data
|
||||
/// (<see cref="SegmentBufferReader"/>'s <c>ManualResetEventSlim.Wait()</c> would throw
|
||||
/// <see cref="PlatformNotSupportedException"/>).</item>
|
||||
/// the deserializer runs synchronously on <c>CHUNK_END</c> over the already-buffered
|
||||
/// <see cref="AsyncPipeReaderInput"/>. After <c>Complete()</c>, the input's
|
||||
/// <c>TryAdvanceSegment</c> never blocks on <c>ManualResetEventSlim.Wait()</c> (which would
|
||||
/// throw <see cref="PlatformNotSupportedException"/> on WASM) — it returns buffered data
|
||||
/// immediately and signals end-of-stream when exhausted.</item>
|
||||
/// </list>
|
||||
/// </para>
|
||||
/// </summary>
|
||||
|
|
@ -99,7 +101,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
public object?[] Args = null!;
|
||||
public int StreamedArgIndex;
|
||||
public Type StreamedArgType = null!;
|
||||
public SegmentBufferReader Buffer = null!;
|
||||
public AsyncPipeReaderInput Input = null!;
|
||||
public Task<object?>? DeserTask;
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -429,9 +431,22 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
/// <summary>
|
||||
/// Writes a message using chunked protocol framing for AsyncSegment mode.
|
||||
/// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg.
|
||||
/// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy).
|
||||
/// CHUNK_END: [202] (1 byte, no data — all data already committed by output).
|
||||
/// <para>The two phases:</para>
|
||||
/// <list type="bullet">
|
||||
/// <item><b>CHUNK_START envelope</b> — standard SignalR framed message
|
||||
/// (<c>[INT32 length][200 marker][header][args except streamedArg as INT32 -1]</c>),
|
||||
/// written here via <see cref="BufferWriterBinaryOutput"/>.</item>
|
||||
/// <item><b>CHUNK_DATA + CHUNK_END</b> — fully owned by <see cref="AsyncPipeWriterOutput"/>
|
||||
/// (invoked through <see cref="AcBinarySerializer.Serialize{T}(T, PipeWriter, AcBinarySerializerOptions, FlushPolicy, TimeSpan?)"/>):
|
||||
/// emits <c>[201][UINT16 size][data]</c> per chunk + <c>[202]</c> end marker + final
|
||||
/// <c>FlushAsync</c>. This protocol layer no longer writes <c>[201]</c>/<c>[202]</c>
|
||||
/// bytes or calls <c>FlushAsync</c> after the streamed-arg serialize — those are the
|
||||
/// streaming primitive's responsibility (see <c>BINARY_ASYNCPIPE</c> docs).</item>
|
||||
/// </list>
|
||||
/// <para>For <c>streamedArg == null</c>, <see cref="AcBinarySerializer.Serialize{T}"/> still
|
||||
/// drives <see cref="AsyncPipeWriterOutput"/> in framed mode — wire is
|
||||
/// <c>[201][UINT16=1][Null][202]</c>, deserializing back to <c>null</c>. No special-casing
|
||||
/// needed in this layer.</para>
|
||||
/// </summary>
|
||||
private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter)
|
||||
{
|
||||
|
|
@ -506,24 +521,20 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
SyncFlush(pipeWriter.FlushAsync());
|
||||
|
||||
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
|
||||
if (streamedArg != null)
|
||||
{
|
||||
// --- CHUNK_DATA + CHUNK_END (fully delegated to AsyncPipeWriterOutput) ---
|
||||
// AsyncPipeWriterOutput in framed mode owns the entire chunked-stream emission:
|
||||
// - [201][UINT16 size][data] per chunk
|
||||
// - [202] CHUNK_END marker
|
||||
// - final FlushAsync
|
||||
// This includes the null streamedArg case (since the AcBinarySerializer null-bypass for
|
||||
// multiMessage=true was removed) — wire is [201][UINT16=1][Null][202], deserialized back to null.
|
||||
// No manual [202] write or extra FlushAsync needed in this layer.
|
||||
dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout);
|
||||
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
|
||||
}
|
||||
|
||||
// --- CHUNK_END [202] ---
|
||||
var endByte = pipeWriter.GetSpan(1);
|
||||
endByte[0] = MsgAsyncChunkEnd;
|
||||
pipeWriter.Advance(1);
|
||||
|
||||
SyncFlush(pipeWriter.FlushAsync());
|
||||
|
||||
_logger?.LogTrace("WriteMessageChunked CHUNK_END written");
|
||||
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA + CHUNK_END emitted via AsyncPipeWriterOutput dataBytes={DataBytes}", dataBytes);
|
||||
|
||||
// Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1)
|
||||
// Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data
|
||||
// Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data.
|
||||
// The +1 at the end is the [202] CHUNK_END marker (now written by AsyncPipeWriterOutput.Flush()).
|
||||
var chunkSize = _options.BufferWriterChunkSize;
|
||||
var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0;
|
||||
var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1;
|
||||
|
|
@ -566,7 +577,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
// the buffer may still contain:
|
||||
// 1. The already-processed CHUNK_START frame
|
||||
// 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously)
|
||||
// Skip both to avoid duplicate writes to state.Buffer.
|
||||
// Skip both to avoid duplicate writes to state.Input.
|
||||
if (TrySkipRepresentedChunkStart(ref input))
|
||||
{
|
||||
_logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length);
|
||||
|
|
@ -864,29 +875,31 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
_logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
|
||||
|
||||
// Write chunk data to SegmentBufferReader for background deserialization
|
||||
// Feed chunk data into AsyncPipeReaderInput for background deserialization.
|
||||
// Note: the input is multiMessage:false — we strip framing here and pass raw data.
|
||||
if (chunkDataSize > 0)
|
||||
{
|
||||
var dataSlice = input.Slice(3, chunkDataSize);
|
||||
foreach (var segment in dataSlice)
|
||||
state.Buffer.Write(segment.Span);
|
||||
state.Input.Feed(segment.Span);
|
||||
}
|
||||
|
||||
// Lazy start: begin background deserialization after first chunk is written.
|
||||
// SegmentBufferReaderInput.Initialize reads the already-written data immediately.
|
||||
// Browser fallback: skip Task.Run — SegmentBufferReader.WaitForData relies on
|
||||
// ManualResetEventSlim.Wait which throws PlatformNotSupportedException on WASM.
|
||||
// Instead, buffer all chunks and run the deserializer synchronously on CHUNK_END,
|
||||
// where state.Buffer.Complete() has already been called and no wait is needed.
|
||||
// The deser task reads via AsyncPipeReaderInputAdapter (struct over class) which
|
||||
// calls TryAdvanceSegment on the input — blocks on ManualResetEventSlim.Wait when
|
||||
// out of data. Browser fallback: skip Task.Run — the MRES.Wait throws
|
||||
// PlatformNotSupportedException on WASM. Instead, buffer all chunks and run the
|
||||
// deserializer synchronously on CHUNK_END, where state.Input.Complete() has
|
||||
// already been called → TryAdvanceSegment never enters the Wait path.
|
||||
if (state.DeserTask == null && !IsBrowser)
|
||||
{
|
||||
_logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
|
||||
|
||||
var reader = state.Buffer;
|
||||
var input2 = state.Input;
|
||||
var type = state.StreamedArgType;
|
||||
var opts = _options;
|
||||
|
||||
state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts));
|
||||
state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input2, type, opts));
|
||||
}
|
||||
|
||||
input = input.Slice(totalNeeded);
|
||||
|
|
@ -899,7 +912,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
_logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion");
|
||||
|
||||
// Signal end of data → background deser task completes
|
||||
state.Buffer.Complete();
|
||||
state.Input.Complete();
|
||||
object? deserializedArg = null;
|
||||
|
||||
try
|
||||
|
|
@ -912,14 +925,15 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
}
|
||||
else
|
||||
{
|
||||
// Browser (WASM) fallback: all chunks are buffered into a single contiguous byte[]
|
||||
// inside SegmentBufferReader. Use ArrayBinaryInput via the offset-aware overload —
|
||||
// strictly faster than SegmentBufferReaderInput here (JIT eliminates
|
||||
// TryAdvanceSegment, no volatile reads, no cross-boundary branching).
|
||||
// Browser (WASM) fallback: run the deserializer synchronously on the
|
||||
// already-buffered input. After Complete() the input's TryAdvanceSegment
|
||||
// returns buffered data immediately and never blocks on
|
||||
// ManualResetEventSlim.Wait (which would throw PlatformNotSupportedException
|
||||
// on WASM). Same struct-adapter path the background task uses; small JIT-
|
||||
// inlined indirection vs. the previous direct byte[] overload — negligible
|
||||
// per-message, and removes the WASM-specific buffer-mutation access.
|
||||
deserializedArg = AcBinaryDeserializer.Deserialize(
|
||||
state.Buffer.Buffer,
|
||||
0,
|
||||
state.Buffer.WritePos,
|
||||
state.Input,
|
||||
state.StreamedArgType,
|
||||
_options);
|
||||
}
|
||||
|
|
@ -936,8 +950,8 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
}
|
||||
finally
|
||||
{
|
||||
_logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove");
|
||||
state.Buffer.Dispose();
|
||||
_logger?.LogDebug("TryParseChunkData [202] cleanup: Input.Dispose + _chunkStates.Remove");
|
||||
state.Input.Dispose();
|
||||
_chunkStates.Remove(binder);
|
||||
}
|
||||
|
||||
|
|
@ -950,17 +964,19 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
return true;
|
||||
}
|
||||
|
||||
// Unknown byte in chunk mode — break out (shouldn't happen)
|
||||
// Unknown byte in chunk mode — break out (shouldn't happen).
|
||||
// Note: AsyncPipeReaderInput's WritePos/ReadPos are private, so the previous diagnostic
|
||||
// fields are unavailable here. Enable AsyncPipeReaderInput.DiagnosticLog (DEBUG-only)
|
||||
// for deeper instrumentation when investigating framing-state corruption.
|
||||
_logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " +
|
||||
"binderHash={BinderHash} inputLength={InputLength} " +
|
||||
"state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}",
|
||||
"state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} chunkFrameBytesConsumed={ChunkFrameBytesConsumed}",
|
||||
firstByte,
|
||||
binder.GetHashCode(),
|
||||
input.Length,
|
||||
state.StreamedArgType.Name,
|
||||
state.DeserTask?.Status.ToString() ?? "null",
|
||||
state.Buffer.WritePos,
|
||||
state.Buffer.ReadPos);
|
||||
state.ChunkFrameBytesConsumed);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
@ -969,7 +985,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
/// <summary>
|
||||
/// Parses CHUNK_START: reads original message (with -1 marker for streamed arg),
|
||||
/// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk.
|
||||
/// creates <see cref="AsyncPipeReaderInput"/>, stores state. Background deser task starts lazily on first chunk.
|
||||
/// Returns null to signal "consumed bytes, no complete message yet".
|
||||
/// </summary>
|
||||
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
|
||||
|
|
@ -1012,7 +1028,10 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
StreamedArgIndex = streamedIndex,
|
||||
StreamedArgType = streamedType,
|
||||
HeaderContext = headerContext,
|
||||
Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger)
|
||||
// multiMessage: false — SignalR's TryParseChunkData parses [201]/[202] framing externally
|
||||
// and feeds raw data bytes into the input. The framing-state-machine inside
|
||||
// AsyncPipeReaderInput is not used on this code path.
|
||||
Input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false)
|
||||
// DeserTask started lazily in TryParseChunkData after first chunk is written
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ public sealed class AcBinaryHubProtocolOptions
|
|||
/// </list>
|
||||
/// Ignored for Bytes and Segment modes.
|
||||
/// </summary>
|
||||
public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.Coalesced;
|
||||
public FlushPolicy FlushPolicy { get; set; } = FlushPolicy.DoubleBuffered;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum wait for a single synchronous <c>FlushAsync</c> before throwing
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -13,7 +13,7 @@ For higher-level SignalR abstractions see `../SIGNALR/SIGNALR_ISSUES.md`.
|
|||
### Known workaround (multi-layer)
|
||||
1. **`AcBinaryHubProtocolOptions.Validate()`** throws `PlatformNotSupportedException` if WASM + AsyncSegment combination is requested → prevents deadlock
|
||||
2. **Consumer code-level safety-net** downgrades `AsyncSegment → Segment` on WASM (see `FruitBankHybrid.Web.Client/Program.cs` Configure lambda)
|
||||
3. **Receive-path** on WASM is fully supported — `SegmentBufferReaderInput` with synchronous fallback at `CHUNK_END` means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves
|
||||
3. **Receive-path** on WASM is fully supported — `AsyncPipeReaderInput` with synchronous fallback at `CHUNK_END` (after `Complete()`, `TryAdvanceSegment` never enters the `MRES.Wait` path) means WASM clients CAN receive AsyncSegment-chunked data from a non-WASM sender, they just cannot send AsyncSegment themselves
|
||||
|
||||
### Related TODO
|
||||
None — architectural constraint of browser WASM threading model.
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -2,6 +2,23 @@
|
|||
|
||||
## Status
|
||||
|
||||
**Accepted (2026-05-03), partially executed** — Steps 1–3 + Step 6 delivered; Steps 4 & 5 dropped during execution.
|
||||
|
||||
### Execution log
|
||||
|
||||
| Step | Topic | Original scope | Outcome |
|
||||
|------|-------|----------------|---------|
|
||||
| 1 | BIN | `AsyncPipeReaderInput.cs` (new sealed class) | ✅ Delivered (`ACCORE-BIN-T-D6H4`, Closed 2026-05-02) |
|
||||
| 2 | BIN | `AsyncPipeReaderInputExtensions.DrainFromAsync` | ✅ Delivered, but moved to test-only assembly during Step 1 follow-up (`ACCORE-BIN-T-M2K1`, Closed 2026-05-02) — framework stays consumer-implements-transport rather than exposing a public drain helper. |
|
||||
| 3 | BIN | `AcBinarySerializerPipeParallelTests.cs` rewrite — real parallel pipeline test | ✅ Delivered (`ACCORE-BIN-T-V7C9`, Closed 2026-05-02) |
|
||||
| 4 | BIN | `AcBinarySerializerNamedPipeExtensions.cs` (NamedPipe helpers) | **❌ Dropped.** Framework decision: stay transport-agnostic, expose only generic `PipeWriter` / `PipeReader` primitives. Tests own `NamedPipeServerStream` / `NamedPipeClientStream` lifecycles directly. See `BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-t6v2` for the doctrine. |
|
||||
| 5 | BIN | `AcBinarySerializerFileStreamExtensions.cs` (FileStream helpers) | **❌ Dropped.** Same rationale as Step 4. Consumers wrap `FileStream` with `PipeWriter.Create` / `PipeReader.Create` themselves. |
|
||||
| 6 | SBP | `AcBinaryHubProtocol.cs` migration to `AsyncPipeReaderInput`; `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` deleted | ✅ Delivered (`ACCORE-SBP-T-G7T2`, Closed 2026-05-03). Both legacy types removed from disk; protocol now fully on `AsyncPipeReaderInput` (multiMessage:false — protocol parses `[201]/[202]` framing externally, AsyncPipe is a passive byte buffer here). |
|
||||
|
||||
The body of this ADR below describes the **as-designed** architecture (Steps 1–6). The dropped Steps 4 & 5 do not invalidate the unified-primitive consolidation that motivated the ADR — the receive-side primitive and the SignalR migration both delivered cleanly.
|
||||
|
||||
### Original status entry (historical)
|
||||
|
||||
Proposed (2026-04-27)
|
||||
|
||||
## Context
|
||||
|
|
@ -268,6 +285,8 @@ No transitive `Microsoft.AspNetCore.SignalR` dependency. The SignalR integration
|
|||
|
||||
### Migration plan (6 steps, each commit-reviewable)
|
||||
|
||||
> **Execution outcome (2026-05-03)**: see the **Execution log** at the top of this ADR for what actually shipped. Steps 1–3 + Step 6 delivered as designed; Steps 4 & 5 (NamedPipe / FileStream helpers) dropped — the framework stays consumer-implements-transport. The table below is preserved as the original migration plan for historical context.
|
||||
|
||||
| Step | Topic | Files | Review checkpoint |
|
||||
|------|-------|-------|-------------------|
|
||||
| 1 | BIN | `AsyncPipeReaderInput.cs` (NEW); existing `SegmentBufferReader.cs` + `SegmentBufferReaderInput.cs` unchanged | New class compiles, unit-tested in isolation; SignalR path still on old types |
|
||||
|
|
|
|||
Loading…
Reference in New Issue