AyCode.Core/AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs

554 lines
29 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
///
/// Self-contained <see cref="IBinaryInputBase"/> implementation that consolidates the legacy
/// <c>SegmentBufferReader</c> + <c>SegmentBufferReaderInput</c> pair into a single sealed class
/// (see ADR-0003 at <c>docs/adr/0003-acbinary-streaming-receive-architecture.md</c>).
///
/// The naming mirrors the send-side <c>AsyncPipeWriterOutput</c> primitive — both follow the
/// .NET BCL convention for type-level <c>Async</c> prefix (<c>AsyncEnumerable</c>,
/// <c>IAsyncDisposable</c>, <c>AsyncLocal&lt;T&gt;</c>, ...).
///
/// <para><see cref="Feed"/> behavior is driven by the <c>multiMessage</c> ctor flag:
/// <c>true</c> (default) — parses <c>[201][UINT16][data]</c> chunked frames + <c>[202]</c> end
/// marker (matches <c>AsyncPipeWriterOutput</c> framed output and SignalR's AsyncSegment wire
/// format); on every <c>[202]</c> the input <b>auto-resets</b> for the next message — multiple
/// <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// calls can reuse the same long-lived input over a single transport. <c>false</c> — appends bytes
/// verbatim (matches <c>AcBinarySerializer.SerializeChunked</c> raw output drained from a
/// <see cref="System.IO.Pipelines.PipeReader"/>); single-message scenario, no auto-reset.</para>
///
/// <para>Usage: <b>push pattern only</b>. The consumer's reader-task reads bytes from any
/// underlying transport (the framework knows nothing about which) and pushes them via
/// <see cref="Feed"/>; a separate consumer thread (or task) calls
/// <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>.
/// The framework does NOT own the transport — the consumer's reader-task does, following the
/// application's threading model.</para>
///
/// <para><b>When chunked-streaming is the right fit</b> (vs raw <c>byte[]</c> /
/// <see cref="AcBinaryDeserializer.Deserialize{T}(byte[], AcBinarySerializerOptions)"/>):</para>
/// <list type="bullet">
/// <item><b>Network transports</b> — TCP / UDP / WebSocket / SSE / HTTP/2 streams. Per-chunk
/// CPU overhead (~30 µs / chunk) is invisible next to ms-scale RTT; the streaming
/// pipeline lets sender, transport, and receiver work in parallel on different parts of
/// the message.</item>
/// <item><b>Multi-connection servers</b> — Kestrel-style (SignalR), gRPC servers, custom RPC
/// hosts. Per-connection peak memory bounded by buffer-size (e.g. 32 KB), not by max
/// message size — 1000 concurrent connections × 1 MB messages = 32 MB peak (vs 1 GB
/// with raw <c>byte[]</c>). LOH allocation pressure (≥ 85 KB messages) is also avoided.</item>
/// <item><b>Message brokers / queues</b> — Kafka / Redis Streams / Azure Service Bus clients
/// that expose <see cref="System.Buffers.IBufferWriter{T}"/> sinks. Streaming serialize
/// writes directly into the transport buffer — no intermediate <c>byte[]</c> allocation.</item>
/// <item><b>File streaming</b> — <c>FileStream</c> behind a
/// <see cref="System.IO.Pipelines.PipeReader"/>. 100 MB+ payloads from disk with constant
/// 32 KB peak memory.</item>
/// <item><b>In-memory <see cref="System.IO.Pipelines.Pipe"/> cross-thread handoff</b> —
/// producer + consumer threads coordinate over a shared <c>Pipe</c>; zero-copy slab handoff.</item>
/// <item><b>Custom transport adapters</b> — anything where the consumer wants to push bytes
/// from a transport-specific reader-task.</item>
/// </list>
///
/// <para><b>When raw <c>byte[]</c> is the right fit</b>: same-process loopback IPC where transport
/// latency is near zero, single-producer/single-consumer batch operations where peak memory is
/// not a constraint, sub-LOH messages (&lt; 85 KB) with no GC-pressure concerns. The chunked-streaming
/// per-chunk CPU overhead is fully visible in these scenarios — raw is faster end-to-end.</para>
///
/// <para><b>Performance characteristic</b>: per-chunk overhead is roughly constant (~25-30 µs —
/// FlushAsync syscall + ReadAsync syscall + framing-parse + sliding-window bookkeeping). Total
/// chunk-overhead = <c>(messageSize / chunkSize) × ~30 µs</c>. The streaming benefit is pipeline
/// parallelism + bounded peak memory — both of which require a non-trivial transport stage to
/// surface (network, file, cross-thread queue). On same-process loopback NamedPipe (the worst-case
/// benchmark scenario), the per-chunk cost dominates and chunked appears slower than raw — this
/// is a benchmark-artifact, not the production characteristic.</para>
///
/// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>. Positions reset
/// to 0 when the consumer catches up (sliding-window cycling — peak buffer memory bounded by
/// chunk size, NOT message size). Grow is the absolute last resort and practically never fires
/// under typical chunk-aligned write patterns.
///
/// <para>Thread-safety:</para>
/// <list type="bullet">
/// <item><c>_writePos</c>: written by producer (<c>Volatile.Write</c>), read by consumer
/// (<c>Volatile.Read</c>).</item>
/// <item><c>_readPos</c>: written by consumer (<c>Volatile.Write</c>), read by producer
/// (<c>Volatile.Read</c>).</item>
/// <item>Reset-to-0 happens in <see cref="Feed"/> only when <c>_readPos == _writePos &gt; 0</c>
/// (consumer is blocked in <see cref="TryAdvanceSegment"/>, not actively reading).</item>
/// <item>Grow happens in <see cref="Feed"/> only when reset is insufficient (consumer is
/// behind). The current buffer is kept alive in <c>_oldBuffers</c> until <see cref="Dispose"/>;
/// <see cref="TryAdvanceSegment"/> picks up the new buffer when called.</item>
/// </list>
///
/// <para>Recommended <c>initialCapacity</c>: <c>options.BufferWriterChunkSize × 2</c> —
/// two-chunks-worth of headroom plus reset-to-0 cycling reuses the same buffer for the message's
/// lifetime regardless of total payload size. SignalR-context: 8 KB (4 KB chunk × 2);
/// standalone-context: 128 KB (64 KB chunk × 2).</para>
/// </summary>
public sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable
{
private byte[] _buffer;
private int _writePos;
private int _readPos; // consumer reports consumed position here
private bool _completed;
// multi-message wire framing flag:
// true (default): Feed() parses [201][UINT16][data] chunked framing + [202] CHUNK_END markers,
// auto-resets the buffer cursor on every [202] for the next message.
// Matches AsyncPipeWriterOutput multi-message wire and SignalR AsyncSegment.
// false: Feed() appends bytes verbatim (no wire-format interpretation, single message
// scenario). Matches AcBinarySerializer.SerializeChunked raw output drained
// from a PipeReader.
private readonly bool _multiMessage;
// Framing state machine — parses [201][UINT16 LE size][data] frames + [202] CHUNK_END.
// [200] CHUNK_START tolerated (skipped). Wire format matches AsyncPipeWriterOutput's
// multi-message output and SignalR's AsyncSegment chunked frame format. Only active when
// _multiMessage = true.
private const byte ChunkStart = 200; // CHUNK_START — tolerated, skipped
private const byte ChunkData = 201; // CHUNK_DATA — header followed by [UINT16 size][data]
private const byte ChunkEnd = 202; // CHUNK_END — signals end-of-MESSAGE (auto-reset for next message)
private FramingState _framingState = FramingState.AwaitingHeader;
private int _sizeAccumulator; // partial UINT16 size during AwaitingSizeLow/High
private int _bytesRemainingInChunk; // remaining data bytes in current CHUNK_DATA frame
private enum FramingState : byte
{
AwaitingHeader, // expect [201] / [202] / [200]
AwaitingSizeLow, // have [201], expect UINT16 LE low byte
AwaitingSizeHigh, // have low, expect UINT16 LE high byte
AwaitingData, // expect _bytesRemainingInChunk data bytes
// No "Done" state — [202] auto-resets to AwaitingHeader for next-message reuse.
// Session-end is signalled by external Complete() / stream-EOF, NOT by framing-state.
}
private readonly ManualResetEventSlim _dataAvailable;
/// <summary>
/// Static diagnostic sink for state-machine transitions, framing-strip events, and buffer
/// state changes. <c>null</c> by default — set from tests / diagnostic tooling to capture
/// trace output. Only effective in DEBUG builds: <see cref="EmitDiagnostic"/> is
/// <see cref="ConditionalAttribute"/>-decorated, so call sites are completely removed in
/// RELEASE (zero runtime cost — string-interpolation arguments at call sites are NOT
/// evaluated either). The field stays as a single null-valued static reference in RELEASE
/// — negligible memory cost in exchange for clean analyzer state and simpler code.
/// </summary>
public static Action<string>? DiagnosticLog;
[Conditional("DEBUG")]
private static void EmitDiagnostic(string message) => DiagnosticLog?.Invoke(message);
// After grow: ALL old buffers are kept alive until Dispose.
// Cannot return them to the pool mid-operation because the consumer thread
// may hold a local reference to any of them (its local 'buffer' variable is
// only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind).
private byte[][]? _oldBuffers;
private int _oldBufferCount;
/// <summary>
/// Creates a new <see cref="AsyncPipeReaderInput"/> with the specified initial capacity.
/// Recommended: <c>options.BufferWriterChunkSize × 2</c> (e.g. 8 KB for the SignalR-context
/// 4 KB chunk size, 128 KB for the standalone 64 KB default).
/// </summary>
/// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param>
/// <param name="multiMessage">
/// <c>true</c> (default): <see cref="Feed"/> parses the multi-message wire framing
/// (<c>[201][UINT16][data]</c> chunks + <c>[202]</c> end-of-MESSAGE marker — matches
/// <see cref="AsyncPipeWriterOutput"/> multi-message output and SignalR's AsyncSegment).
/// On every <c>[202]</c> the input auto-resets the buffer cursor for the next message —
/// the same long-lived input can be reused across many
/// <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// calls without allocating a fresh instance per message. End of session is signalled by an
/// external <see cref="Complete"/> call or stream-EOF, NOT by <c>[202]</c>.
///
/// <c>false</c>: <see cref="Feed"/> appends bytes verbatim — single-message scenario where the
/// stream lifecycle equals the message lifecycle (matches <c>AcBinarySerializer.SerializeChunked</c>
/// raw output, paired with <c>pipeWriter.CompleteAsync()</c> as the end-of-message signal).
/// </param>
public AsyncPipeReaderInput(int initialCapacity, bool multiMessage = true)
{
if (initialCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(initialCapacity));
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_multiMessage = multiMessage;
_dataAvailable = new ManualResetEventSlim(false);
}
// --- Producer API (push) ---
/// <summary>
/// Feeds bytes into the consumer-visible buffer. Behavior is driven by the
/// <c>multiMessage</c> ctor flag:
/// <list type="bullet">
/// <item><b>multiMessage = true</b> (default): expects the multi-message wire format
/// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c> CHUNK_START
/// prefix, treats <c>[202]</c> CHUNK_END as <b>end-of-MESSAGE</b>. State is preserved
/// across <c>Feed</c> calls — partial frame headers, mid-size boundaries, and mid-data
/// boundaries all resume correctly. On <c>[202]</c>, the input <b>auto-resets</b> the
/// buffer cursor for the next message (signals the producer's sliding-window cycling
/// to recycle the buffer on next <see cref="AppendToBuffer"/>) and resets the framing
/// state machine to <c>AwaitingHeader</c> — the next bytes are expected to be a new
/// <c>[201]...</c> frame. End-of-session is NOT signalled by <c>[202]</c>; only an
/// external <see cref="Complete"/> call or stream-EOF marks the session as ended.</item>
/// <item><b>multiMessage = false</b>: appends bytes verbatim — no wire-format interpretation.
/// The producer passes only payload bytes (e.g. raw byte stream drained from a
/// <see cref="System.IO.Pipelines.PipeReader"/> paired with
/// <c>AcBinarySerializer.SerializeChunked</c>). Single-message scenario; end-of-message
/// is the same as end-of-stream, signalled by external <see cref="Complete"/> call.</item>
/// </list>
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
public void Feed(ReadOnlySpan<byte> data)
{
if (data.IsEmpty) return;
if (!_multiMessage)
{
// Single-message mode: append verbatim, no framing interpretation.
AppendToBuffer(data);
return;
}
// Multi-message mode: state machine parses [201][UINT16 LE size][data] frames + [202] end-of-message marker.
var i = 0;
while (i < data.Length)
{
switch (_framingState)
{
case FramingState.AwaitingHeader:
{
var marker = data[i++];
if (marker == ChunkData)
{
_framingState = FramingState.AwaitingSizeLow;
}
else if (marker == ChunkStart)
{
// Tolerated (skip); stay in AwaitingHeader for next [201]/[202]
EmitDiagnostic("Feed: CHUNK_START [200] tolerated/skipped");
}
else if (marker == ChunkEnd)
{
// [202] = end of CURRENT message on the WIRE (NOT end of session). Reset only the
// framing state machine to AwaitingHeader for the next [201] header.
// Buffer-cursor recycling is NOT triggered here — the producer-thread cannot safely
// reset _writePos = 0 while the consumer-thread may still be reading earlier bytes
// of the just-finished message (multi-chunk messages: by the time the producer
// parses [202], the consumer may have only consumed part of the buffer).
// The buffer-cycle signal is emitted by the consumer via MessageDone() — typically
// from the AcBinaryDeserializer.Deserialize<T>(AsyncPipeReaderInput, opts) finally
// block, AFTER the deserialiser has finished reading and returned the graph.
// _completed stays false — only external Complete() / stream-EOF marks session end.
EmitDiagnostic("Feed: CHUNK_END [202] received — framing reset; awaiting MessageDone() from consumer for buffer recycle");
_framingState = FramingState.AwaitingHeader;
}
else
{
throw new InvalidDataException(
$"Unexpected framing marker byte 0x{marker:X2} ({marker}) — expected 200/201/202.");
}
break;
}
case FramingState.AwaitingSizeLow:
_sizeAccumulator = data[i++];
_framingState = FramingState.AwaitingSizeHigh;
break;
case FramingState.AwaitingSizeHigh:
_sizeAccumulator |= data[i++] << 8;
_bytesRemainingInChunk = _sizeAccumulator;
_sizeAccumulator = 0;
_framingState = FramingState.AwaitingData;
EmitDiagnostic($"Feed: chunk header parsed, dataSize={_bytesRemainingInChunk}");
if (_bytesRemainingInChunk == 0)
{
// Empty CHUNK_DATA frame — go back to header state immediately
_framingState = FramingState.AwaitingHeader;
}
break;
case FramingState.AwaitingData:
{
var available = data.Length - i;
var toAppend = Math.Min(_bytesRemainingInChunk, available);
if (toAppend > 0)
{
AppendToBuffer(data.Slice(i, toAppend));
i += toAppend;
_bytesRemainingInChunk -= toAppend;
}
if (_bytesRemainingInChunk == 0)
{
_framingState = FramingState.AwaitingHeader;
}
break;
}
}
}
}
/// <summary>
/// Appends data bytes to the internal buffer with sliding-window cycling
/// (reset to 0 when consumer has caught up OR a [202] message-end sentinel was raised) and
/// grow-as-last-resort. Signals the consumer.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
private void AppendToBuffer(ReadOnlySpan<byte> data)
{
// Cycle the buffer to 0 if either:
// (a) consumer has caught up to _writePos (classic sliding-window pattern), OR
// (b) a [202] CHUNK_END marker was just parsed and armed _readPos = -1 (sentinel) —
// the message is complete on the wire, the consumer (per wire-format guarantee)
// has read or will read exactly _writePos bytes; the next bytes are the start of
// a new message and belong at offset 0.
var rp = Volatile.Read(ref _readPos);
if (rp < 0 || (rp > 0 && rp == _writePos))
{
EmitDiagnostic($"AppendToBuffer reset positions rp={rp} wp={_writePos} → 0");
_writePos = 0;
Volatile.Write(ref _readPos, 0);
}
// Grow if buffer can't fit the new data (rare — consumer typically keeps pace)
if (_writePos + data.Length > _buffer.Length)
{
EmitDiagnostic($"AppendToBuffer grow required wp={_writePos} dataLen={data.Length} bufLen={_buffer.Length}");
Grow(_writePos + data.Length);
}
data.CopyTo(_buffer.AsSpan(_writePos));
var newWritePos = _writePos + data.Length;
Volatile.Write(ref _writePos, newWritePos);
_dataAvailable.Set();
EmitDiagnostic($"AppendToBuffer dataLen={data.Length} newWritePos={newWritePos} readPos={Volatile.Read(ref _readPos)}");
}
/// <summary>
/// Signals that no more data will be written. The consumer's <see cref="TryAdvanceSegment"/>
/// will return <c>false</c> once all buffered data is consumed.
/// </summary>
public void Complete()
{
Volatile.Write(ref _completed, true);
_dataAvailable.Set();
EmitDiagnostic($"Complete writePos={Volatile.Read(ref _writePos)} readPos={Volatile.Read(ref _readPos)}");
}
/// <summary>
/// Whether <see cref="Complete"/> has been called (typically by the consumer's reader-task
/// finally-block after the underlying transport signals EOF). Once <c>true</c>, the session
/// has ended — any pending <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// call returns whatever partial buffer is left, and subsequent calls return immediately.
/// </summary>
public bool IsCompleted => Volatile.Read(ref _completed);
/// <summary>
/// Called by the consumer to signal "I have finished reading the current message" — typically
/// from the <see cref="AcBinaryDeserializer.Deserialize{T}(AsyncPipeReaderInput, AcBinarySerializerOptions)"/>
/// finally block, AFTER the deserialiser has finished reading and the structurally-complete graph
/// has been returned. Resets BOTH <c>_readPos</c> AND <c>_writePos</c> to 0 atomically so the next
/// <see cref="Initialize"/> sees a fresh empty buffer (<c>bufferLength = 0</c>) and the consumer
/// blocks in <see cref="TryAdvanceSegment"/> until the producer's next message arrives — the
/// drain task's first <see cref="AppendToBuffer"/> for the next message writes from offset 0
/// (no cycling needed; positions are already 0).
///
/// <para><b>Why reset both positions, not just <c>_readPos</c>-sentinel</b>: a <c>_readPos = -1</c>
/// sentinel alone leaves <c>_writePos</c> at the previous message's end. If <see cref="Initialize"/>
/// runs BEFORE the drain task's next <see cref="AppendToBuffer"/> (a real race when single-chunk
/// messages fit in one transport pass), the consumer reads <c>bufferLength = _writePos</c> = stale
/// value, and starts deserialising the previous message's bytes from offset 0 — corruption. Resetting
/// both atomically here closes the race: <see cref="Initialize"/> always sees <c>bufferLength = 0</c>,
/// and the next <see cref="AppendToBuffer"/> writes at <c>_writePos = 0</c> (no cycle needed).</para>
///
/// <para><b>Why the consumer signals (not the producer)</b>: the producer parses <c>[202]</c>
/// strictly on the wire — at the moment <c>[202]</c> arrives, the consumer-thread may still be
/// mid-graph reading earlier bytes of the just-finished message (especially for multi-chunk
/// messages, where the producer outpaces the consumer). Recycling the buffer based on
/// producer-side <c>[202]</c> alone races with the in-flight consumer read. By emitting the
/// signal from the consumer's "I'm done"-moment instead, both orderings are guaranteed: the
/// consumer has finished reading, AND the wire-side <c>[202]</c> has long since been parsed
/// (since the consumer reads only what the producer wrote).</para>
///
/// <para><b>Thread-safety</b>: safe because the producer (drain task / <see cref="Feed"/>) cannot
/// have an <see cref="AppendToBuffer"/> in flight at this moment — the consumer's
/// <see cref="Deserialize{T}"/> just returned (graph complete = all bytes already appended), and
/// the producer-side <see cref="Serialize{T}"/> for the NEXT message has not yet been issued by
/// the calling thread (strictly sequential per-thread <c>Serialize → Deserialize</c> loop). Any
/// pending <c>[202]</c> still being parsed by the drain task only mutates framing state, never
/// invokes <see cref="AppendToBuffer"/>.</para>
///
/// <para><b>Idempotent</b>: safe to call multiple times. No-op if the session has already
/// completed (<see cref="IsCompleted"/> is <c>true</c>) — there are no further messages.</para>
/// </summary>
public void MessageDone()
{
if (Volatile.Read(ref _completed)) return; // session already over
Volatile.Write(ref _writePos, 0);
Volatile.Write(ref _readPos, 0);
EmitDiagnostic("MessageDone: positions reset (writePos=0, readPos=0) for next message");
}
// --- IBinaryInputBase (consumer thread) ---
/// <summary>
/// Provides the initial buffer state. Called once before deserialization begins.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{
buffer = _buffer;
position = 0;
bufferLength = Volatile.Read(ref _writePos);
EmitDiagnostic($"Initialize bufferLength={bufferLength}");
}
/// <summary>
/// Called when the deserialization context needs more bytes than currently available.
/// Reports consumed position to the producer, then blocks via <see cref="ManualResetEventSlim"/>
/// until enough data arrives or <see cref="Complete"/> is called.
///
/// <para>Uses the double-check pattern to avoid missed signals:
/// <c>Reset() → check → if still not enough, Wait()</c>.</para>
///
/// <para>No cross-boundary handling needed — the buffer is a single contiguous <c>byte[]</c>.
/// After grow, re-reads <c>_buffer</c> to get the new (larger) array. After position reset
/// (readPos/writePos set to 0 by producer), re-reads adjusted positions.</para>
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining | MethodImplOptions.AggressiveOptimization)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
EmitDiagnostic($"TryAdvanceSegment enter position={position} bufferLength={bufferLength} needed={needed}");
// Report how far we've consumed — enables producer to reset positions to 0.
// Sentinel respect: if _readPos < 0 (a [202] CHUNK_END marker armed it), DO NOT overwrite
// the sentinel — the next AppendToBuffer needs to see it to cycle the buffer to 0.
// The local sentinel-defence below ensures correct logic during the transient race window.
if (Volatile.Read(ref _readPos) >= 0)
{
Volatile.Write(ref _readPos, position);
}
while (true)
{
// Re-read positions (may have been reset to 0 by producer)
int rp = Volatile.Read(ref _readPos);
int wp = Volatile.Read(ref _writePos);
// Sentinel defence: if [202] armed _readPos = -1 while we were reading, treat the
// sentinel as "use our local position" — the cycle hasn't fired yet (no AppendToBuffer
// has run since [202]); we still consume from our own position into the existing buffer.
if (rp < 0) rp = position;
if (wp - rp >= needed)
{
buffer = _buffer; // may be new array after grow
position = rp; // may be 0 after reset
bufferLength = wp;
EmitDiagnostic($"TryAdvanceSegment return true (data available) position={position} bufferLength={bufferLength}");
return true;
}
if (Volatile.Read(ref _completed))
{
// No more data will arrive. Return whatever is left.
if (wp > rp)
{
buffer = _buffer;
position = rp;
bufferLength = wp;
EmitDiagnostic($"TryAdvanceSegment return true (completed, partial) position={position} bufferLength={bufferLength}");
return true;
}
EmitDiagnostic("TryAdvanceSegment return false (completed, empty)");
return false;
}
// Double-check pattern: Reset → verify → Wait
_dataAvailable.Reset();
rp = Volatile.Read(ref _readPos);
if (rp < 0) rp = position; // sentinel defence (same as the top of the loop)
wp = Volatile.Read(ref _writePos);
if (wp - rp >= needed || Volatile.Read(ref _completed)) continue;
EmitDiagnostic($"TryAdvanceSegment waiting (wp={wp} rp={rp} needed={needed})");
_dataAvailable.Wait();
EmitDiagnostic("TryAdvanceSegment woke up");
}
}
/// <summary>
/// No-op. Buffer lifecycle is managed by <see cref="Dispose"/>.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Release() { }
// --- Lifecycle ---
public void Dispose()
{
// Return all old buffers accumulated from grows
if (_oldBuffers != null)
{
for (var i = 0; i < _oldBufferCount; i++)
{
ArrayPool<byte>.Shared.Return(_oldBuffers[i]);
_oldBuffers[i] = null!;
}
_oldBuffers = null;
_oldBufferCount = 0;
}
// Return current buffer
if (_buffer != null!)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null!;
}
_dataAvailable.Dispose();
}
// --- Internal ---
private void Grow(int requiredCapacity)
{
var newSize = Math.Max(_buffer.Length * 2, requiredCapacity);
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos);
// Keep the current buffer alive — consumer's local 'buffer' variable may still reference it
// (consumer may lag multiple grows behind before calling TryAdvanceSegment).
// Returning old buffers to the pool mid-operation would cause use-after-free
// if another pool user overwrites them while the consumer is still reading.
if (_oldBuffers == null) _oldBuffers = new byte[4][];
else if (_oldBufferCount == _oldBuffers.Length) Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2);
_oldBuffers[_oldBufferCount++] = _buffer;
_buffer = newBuffer;
}
// --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) ---
}