AyCode.Core/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs

464 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
///
/// Self-contained <see cref="IBinaryInputBase"/> implementation that consolidates the legacy
/// <c>SegmentBufferReader</c> + <c>SegmentBufferReaderInput</c> pair into a single sealed class
/// (see ADR-0003 at <c>docs/adr/0003-acbinary-streaming-receive-architecture.md</c>).
///
/// The naming mirrors the send-side <c>AsyncPipeWriterOutput</c> primitive — both follow the
/// .NET BCL convention for type-level <c>Async</c> prefix (<c>AsyncEnumerable</c>,
/// <c>IAsyncDisposable</c>, <c>AsyncLocal&lt;T&gt;</c>, ...).
///
/// <para><see cref="Feed"/> behavior is driven by the <c>multiMessage</c> ctor flag:
/// <c>true</c> (default) — parses <c>[201][UINT16][data]</c> chunked frames + <c>[202]</c> end
/// marker (matches <c>AsyncPipeWriterOutput</c> framed output and SignalR's AsyncSegment wire
/// format); on every <c>[202]</c> the input <b>auto-resets</b> for the next message — multiple
/// <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// calls can reuse the same long-lived input over a single transport. <c>false</c> — appends bytes
/// verbatim (matches <c>AcBinarySerializer.SerializeChunked</c> raw output drained from a
/// <see cref="System.IO.Pipelines.PipeReader"/>); single-message scenario, no auto-reset.</para>
///
/// <para>Usage modes:</para>
/// <list type="bullet">
/// <item><b>Push (Feed-API)</b>: producer thread calls <see cref="Feed"/> with chunk bytes
/// (typical for SignalR <c>TryParseChunkData</c>).</item>
/// <item><b>Pull (DrainFromAsync extension)</b>: helper drains a
/// <see cref="System.IO.Pipelines.PipeReader"/> into the input via repeated
/// <see cref="Feed"/> calls (typical for NamedPipe / FileStream / NetworkStream).</item>
/// </list>
///
/// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>. Positions reset
/// to 0 when the consumer catches up (sliding-window cycling — peak buffer memory bounded by
/// chunk size, NOT message size). Grow is the absolute last resort and practically never fires
/// under typical chunk-aligned write patterns.
///
/// <para>Thread-safety:</para>
/// <list type="bullet">
/// <item><c>_writePos</c>: written by producer (<c>Volatile.Write</c>), read by consumer
/// (<c>Volatile.Read</c>).</item>
/// <item><c>_readPos</c>: written by consumer (<c>Volatile.Write</c>), read by producer
/// (<c>Volatile.Read</c>).</item>
/// <item>Reset-to-0 happens in <see cref="Feed"/> only when <c>_readPos == _writePos &gt; 0</c>
/// (consumer is blocked in <see cref="TryAdvanceSegment"/>, not actively reading).</item>
/// <item>Grow happens in <see cref="Feed"/> only when reset is insufficient (consumer is
/// behind). The current buffer is kept alive in <c>_oldBuffers</c> until <see cref="Dispose"/>;
/// <see cref="TryAdvanceSegment"/> picks up the new buffer when called.</item>
/// </list>
///
/// <para>Recommended <c>initialCapacity</c>: <c>options.BufferWriterChunkSize × 2</c> —
/// two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the message's
/// lifetime regardless of total payload size. SignalR-context: 8 KB (4 KB chunk × 2);
/// standalone-context: 128 KB (64 KB chunk × 2).</para>
/// </summary>
public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
{
private byte[] _buffer;
private int _writePos;
private int _readPos; // consumer reports consumed position here
private bool _completed;
// multi-message wire framing flag:
// true (default): Feed() parses [201][UINT16][data] chunked framing + [202] CHUNK_END markers,
// auto-resets the buffer cursor on every [202] for the next message.
// Matches AsyncPipeWriterOutput multi-message wire and SignalR AsyncSegment.
// false: Feed() appends bytes verbatim (no wire-format interpretation, single message
// scenario). Matches AcBinarySerializer.SerializeChunked raw output drained
// from a PipeReader.
private readonly bool _multiMessage;
// Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END.
// [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's
// multi-message output and SignalR's AsyncSegment chunked frame format. Only active when
// _multiMessage = true.
private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped
private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data]
private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-MESSAGE (auto-reset for next message)
private FramingState _framingState = FramingState.AwaitingHeader;
private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High
private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame
private enum FramingState : byte
{
AwaitingHeader, // expect [201] / [202] / [200]
AwaitingSizeLow, // have [201], expect UINT16 LE low byte
AwaitingSizeHigh, // have low, expect UINT16 LE high byte
AwaitingData, // expect _bytesRemainingInChunk data bytes
// No "Done" state — [202] auto-resets to AwaitingHeader for next-message reuse.
// Session-end is signalled by external Complete() / stream-EOF, NOT by framing-state.
}
private readonly ManualResetEventSlim _dataAvailable;
/// <summary>
/// Static diagnostic sink for state-machine transitions, framing-strip events, and buffer
/// state changes. <c>null</c> by default — set from tests / diagnostic tooling to capture
/// trace output. Only effective in DEBUG builds: <see cref="EmitDiagnostic"/> is
/// <see cref="ConditionalAttribute"/>-decorated, so call sites are completely removed in
/// RELEASE (zero runtime cost — string-interpolation arguments at call sites are NOT
/// evaluated either). The field stays as a single null-valued static reference in RELEASE
/// — negligible memory cost in exchange for clean analyzer state and simpler code.
/// </summary>
public static Action<string>? DiagnosticLog;
[Conditional("DEBUG")]
private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message);
// After grow: ALL old buffers are kept alive until Dispose.
// Cannot return them to the pool mid-operation because the consumer thread
// may hold a local reference to any of them (its local 'buffer' variable is
// only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind).
private byte[][]? _oldBuffers;
private int _oldBufferCount;
/// <summary>
/// Creates a new <see cref="AsyncPipeReaderInput"/> with the specified initial capacity.
/// Recommended: <c>options.BufferWriterChunkSize × 2</c> (e.g. 8 KB for the SignalR-context
/// 4 KB chunk size, 128 KB for the standalone 64 KB default).
/// </summary>
/// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param>
/// <param name="multiMessage">
/// <c>true</c> (default): <see cref="Feed"/> parses the multi-message wire framing
/// (<c>[201][UINT16][data]</c> chunks + <c>[202]</c> end-of-MESSAGE marker — matches
/// <see cref="AsyncPipeWriterOutput"/> multi-message output and SignalR's AsyncSegment).
/// On every <c>[202]</c> the input auto-resets the buffer cursor for the next message —
/// the same long-lived input can be reused across many
/// <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// calls without allocating a fresh instance per message. End of session is signalled by an
/// external <see cref="Complete"/> call or stream-EOF, NOT by <c>[202]</c>.
///
/// <c>false</c>: <see cref="Feed"/> appends bytes verbatim — single-message scenario where the
/// stream lifecycle equals the message lifecycle (matches <c>AcBinarySerializer.SerializeChunked</c>
/// raw output, paired with <c>pipeWriter.CompleteAsync()</c> as the end-of-message signal).
/// </param>
public AsyncPipeReaderInput(int initialCapacity, bool multiMessage = true)
{
if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity));
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_multiMessage = multiMessage;
_dataAvailable = new ManualResetEventSlim(false);
}
// --- Producer API (push) ---
/// <summary>
/// Feeds bytes into the consumer-visible buffer. Behavior is driven by the
/// <c>multiMessage</c> ctor flag:
/// <list type="bullet">
/// <item><b>multiMessage = true</b> (default): expects the multi-message wire format
/// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c> CHUNK_START
/// prefix, treats <c>[202]</c> CHUNK_END as <b>end-of-MESSAGE</b>. State is preserved
/// across <c>Feed</c> calls — partial frame headers, mid-size boundaries, and mid-data
/// boundaries all resume correctly. On <c>[202]</c>, the input <b>auto-resets</b> the
/// buffer cursor for the next message (signals the producer's sliding-window cycling
/// to recycle the buffer on next <see cref="AppendToBuffer"/>) and resets the framing
/// state machine to <c>AwaitingHeader</c> — the next bytes are expected to be a new
/// <c>[201]...</c> frame. End-of-session is NOT signalled by <c>[202]</c>; only an
/// external <see cref="Complete"/> call or stream-EOF marks the session as ended.</item>
/// <item><b>multiMessage = false</b>: appends bytes verbatim — no wire-format interpretation.
/// The producer passes only payload bytes (e.g. raw byte stream drained from a
/// <see cref="System.IO.Pipelines.PipeReader"/> paired with
/// <c>AcBinarySerializer.SerializeChunked</c>). Single-message scenario; end-of-message
/// is the same as end-of-stream, signalled by external <see cref="Complete"/> call.</item>
/// </list>
/// </summary>
public void Feed(ReadOnlySpan<byte> data)
{
if (data.IsEmpty) return;
if (!_multiMessage)
{
// Single-message mode: append verbatim, no framing interpretation.
AppendToBuffer(data);
return;
}
// Multi-message mode: state machine parses [201][UINT16 LE size][data] frames + [202] end-of-message marker.
var i = 0;
while (i < data.Length)
{
switch (_framingState)
{
case FramingState.AwaitingHeader:
{
var marker = data[i++];
if (marker == ChunkData)
{
_framingState = FramingState.AwaitingSizeLow;
}
else if (marker == ChunkStart)
{
// Tolerated (skip); stay in AwaitingHeader for next [201]/[202]
EmitDiagnostic("Feed: CHUNK_START [200] tolerated/skipped");
}
else if (marker == ChunkEnd)
{
// [202] = end of CURRENT message (NOT end of session). Two-step signal:
// (a) reset framing state machine to AwaitingHeader for the next [201] header,
// (b) write _readPos = -1 sentinel — picked up by the next AppendToBuffer's
// sliding-window cycling, which resets the buffer to 0 for the new message.
// _completed stays false — only external Complete() / stream-EOF marks session end.
// The sentinel is wire-format intrinsic: TryAdvanceSegment / Initialize handle
// _readPos < 0 defensively (treat as "fully consumed"), so the consumer never
// observes the sentinel directly — by the time the consumer reaches the next
// Initialize call, AppendToBuffer has already cycled _readPos back to 0.
EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset, _readPos sentinel armed");
_framingState = FramingState.AwaitingHeader;
Volatile.Write(ref _readPos, -1);
}
else
{
throw new InvalidDataException(
$"Unexpected framing marker byte 0x{marker:X2} ({marker}) — expected 200/201/202.");
}
break;
}
case FramingState.AwaitingSizeLow:
_sizeAccumulator = data[i++];
_framingState = FramingState.AwaitingSizeHigh;
break;
case FramingState.AwaitingSizeHigh:
_sizeAccumulator |= data[i++] << 8;
_bytesRemainingInChunk = _sizeAccumulator;
_sizeAccumulator = 0;
_framingState = FramingState.AwaitingData;
EmitDiagnostic($"Feed: chunk header parsed, dataSize={_bytesRemainingInChunk}");
if (_bytesRemainingInChunk == 0)
{
// Empty CHUNK_DATA frame — go back to header state immediately
_framingState = FramingState.AwaitingHeader;
}
break;
case FramingState.AwaitingData:
{
var available = data.Length - i;
var toAppend = Math.Min(_bytesRemainingInChunk, available);
if (toAppend > 0)
{
AppendToBuffer(data.Slice(i, toAppend));
i += toAppend;
_bytesRemainingInChunk -= toAppend;
}
if (_bytesRemainingInChunk == 0)
{
_framingState = FramingState.AwaitingHeader;
}
break;
}
}
}
}
/// <summary>
/// Appends data bytes to the internal buffer with sliding-window cycling
/// (reset to 0 when consumer has caught up OR a [202] message-end sentinel was raised) and
/// grow-as-last-resort. Signals the consumer.
/// </summary>
private void AppendToBuffer(ReadOnlySpan<byte> data)
{
// Cycle the buffer to 0 if either:
// (a) consumer has caught up to _writePos (classic sliding-window pattern), OR
// (b) a [202] CHUNK_END marker was just parsed and armed _readPos = -1 (sentinel) —
// the message is complete on the wire, the consumer (per wire-format guarantee)
// has read or will read exactly _writePos bytes; the next bytes are the start of
// a new message and belong at offset 0.
var rp = Volatile.Read(ref _readPos);
if (rp < 0 || (rp > 0 && rp == _writePos))
{
EmitDiagnostic($"AppendToBuffer reset positions rp={rp} wp={_writePos} → 0");
_writePos = 0;
Volatile.Write(ref _readPos, 0);
}
// Grow if buffer can't fit the new data (rare — consumer typically keeps pace)
if (_writePos + data.Length > _buffer.Length)
{
EmitDiagnostic($"AppendToBuffer grow required wp={_writePos} dataLen={data.Length} bufLen={_buffer.Length}");
Grow(_writePos + data.Length);
}
data.CopyTo(_buffer.AsSpan(_writePos));
var newWritePos = _writePos + data.Length;
Volatile.Write(ref _writePos, newWritePos);
_dataAvailable.Set();
EmitDiagnostic($"AppendToBuffer dataLen={data.Length} newWritePos={newWritePos} readPos={Volatile.Read(ref _readPos)}");
}
/// <summary>
/// Signals that no more data will be written. The consumer's <see cref="TryAdvanceSegment"/>
/// will return <c>false</c> once all buffered data is consumed.
/// </summary>
public void Complete()
{
Volatile.Write(ref _completed, true);
_dataAvailable.Set();
EmitDiagnostic($"Complete writePos={Volatile.Read(ref _writePos)} readPos={Volatile.Read(ref _readPos)}");
}
// --- IBinaryInputBase (consumer thread) ---
/// <summary>
/// Provides the initial buffer state. Called once before deserialization begins.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{
buffer = _buffer;
position = 0;
bufferLength = Volatile.Read(ref _writePos);
EmitDiagnostic($"Initialize bufferLength={bufferLength}");
}
/// <summary>
/// Called when the deserialization context needs more bytes than currently available.
/// Reports consumed position to the producer, then blocks via <see cref="ManualResetEventSlim"/>
/// until enough data arrives or <see cref="Complete"/> is called.
///
/// <para>Uses the double-check pattern to avoid missed signals:
/// <c>Reset() → check → if still not enough, Wait()</c>.</para>
///
/// <para>No cross-boundary handling needed — the buffer is a single contiguous <c>byte[]</c>.
/// After grow, re-reads <c>_buffer</c> to get the new (larger) array. After position reset
/// (readPos/writePos set to 0 by producer), re-reads adjusted positions.</para>
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
EmitDiagnostic($"TryAdvanceSegment enter position={position} bufferLength={bufferLength} needed={needed}");
// Report how far we've consumed — enables producer to reset positions to 0.
// Sentinel respect: if _readPos < 0 (a [202] CHUNK_END marker armed it), DO NOT overwrite
// the sentinel — the next AppendToBuffer needs to see it to cycle the buffer to 0.
// The local sentinel-defence below ensures correct logic during the transient race window.
if (Volatile.Read(ref _readPos) >= 0)
{
Volatile.Write(ref _readPos, position);
}
while (true)
{
// Re-read positions (may have been reset to 0 by producer)
int rp = Volatile.Read(ref _readPos);
int wp = Volatile.Read(ref _writePos);
// Sentinel defence: if [202] armed _readPos = -1 while we were reading, treat the
// sentinel as "use our local position" — the cycle hasn't fired yet (no AppendToBuffer
// has run since [202]); we still consume from our own position into the existing buffer.
if (rp < 0) rp = position;
if (wp - rp >= needed)
{
buffer = _buffer; // may be new array after grow
position = rp; // may be 0 after reset
bufferLength = wp;
EmitDiagnostic($"TryAdvanceSegment return true (data available) position={position} bufferLength={bufferLength}");
return true;
}
if (Volatile.Read(ref _completed))
{
// No more data will arrive. Return whatever is left.
if (wp > rp)
{
buffer = _buffer;
position = rp;
bufferLength = wp;
EmitDiagnostic($"TryAdvanceSegment return true (completed, partial) position={position} bufferLength={bufferLength}");
return true;
}
EmitDiagnostic("TryAdvanceSegment return false (completed, empty)");
return false;
}
// Double-check pattern: Reset → verify → Wait
_dataAvailable.Reset();
rp = Volatile.Read(ref _readPos);
if (rp < 0) rp = position; // sentinel defence (same as the top of the loop)
wp = Volatile.Read(ref _writePos);
if (wp - rp >= needed || Volatile.Read(ref _completed)) continue;
EmitDiagnostic($"TryAdvanceSegment waiting (wp={wp} rp={rp} needed={needed})");
_dataAvailable.Wait();
EmitDiagnostic("TryAdvanceSegment woke up");
}
}
/// <summary>
/// No-op. Buffer lifecycle is managed by <see cref="Dispose"/>.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Release() { }
// --- Lifecycle ---
public void Dispose()
{
// Return all old buffers accumulated from grows
if (_oldBuffers != null)
{
for (var i = 0; i < _oldBufferCount; i++)
{
ArrayPool<byte>.Shared.Return(_oldBuffers[i]);
_oldBuffers[i] = null!;
}
_oldBuffers = null;
_oldBufferCount = 0;
}
// Return current buffer
if (_buffer != null!)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null!;
}
_dataAvailable.Dispose();
}
// --- Internal ---
private void Grow(int requiredCapacity)
{
var newSize = Math.Max(_buffer.Length * 2, requiredCapacity);
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos);
// Keep the current buffer alive — consumer's local 'buffer' variable may still reference it
// (consumer may lag multiple grows behind before calling TryAdvanceSegment).
// Returning old buffers to the pool mid-operation would cause use-after-free
// if another pool user overwrites them while the consumer is still reading.
if (_oldBuffers == null) _oldBuffers = new byte[4][];
else if (_oldBufferCount == _oldBuffers.Length) Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2);
_oldBuffers[_oldBufferCount++] = _buffer;
_buffer = newBuffer;
}
// --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) ---
}