[LOADED_DOCS: .github\copilot-instructions.md]

Refactor: replace PipeReaderBinaryInput with SegmentBufferReader

- Remove PipeReaderBinaryInput and all related code.
- Add SegmentBufferReader and SegmentBufferReaderInput for efficient, thread-safe chunked streaming deserialization.
- Update AcBinaryDeserializer and AcBinaryHubProtocol to use the new buffer for async segment protocol, improving state management and background deserialization.
- Enhance chunked protocol handling: skip re-presented chunk start frames, track consumed chunk frame bytes, and improve logging.
- Update test infrastructure to support async segment protocol and add AsyncSegmentPipeTransportWriter for realistic testing.
- Update settings.local.json to reflect new build/test commands and remove obsolete files.
- Improves performance, reliability, and testability of chunked SignalR streaming.
This commit is contained in:
Loretta 2026-04-18 14:31:27 +02:00
parent a5d2cd0b0e
commit 4343ab4d53
10 changed files with 687 additions and 288 deletions

View File

@ -52,7 +52,11 @@
"Bash(dotnet --version)", "Bash(dotnet --version)",
"WebSearch", "WebSearch",
"Bash(dotnet script:*)", "Bash(dotnet script:*)",
"Bash(xargs wc:*)" "Bash(xargs wc:*)",
"PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\AyCode.Core.csproj\" --no-restore 2>&1)",
"PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services\\\\AyCode.Services.csproj\" --no-restore 2>&1)",
"PowerShell(Remove-Item \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Core\\\\Serializers\\\\Binaries\\\\PipeReaderBinaryInput.cs\" -Confirm:$false)",
"PowerShell(dotnet build \"H:\\\\Applications\\\\Aycode\\\\Source\\\\AyCode.Core\\\\AyCode.Services.Server.Tests\\\\AyCode.Services.Server.Tests.csproj\" --no-restore 2>&1)"
] ]
} }
} }

View File

@ -292,17 +292,13 @@ public static partial class AcBinaryDeserializer
} }
/// <summary> /// <summary>
/// Deserialize from PipeReader with segment streaming (read per chunk via PipeReaderBinaryInput). /// Deserialize from a <see cref="SegmentBufferReader"/> with streaming pipeline parallelism.
/// Data is consumed as it arrives from the network, enabling pipeline parallelism. /// The producer thread writes chunk data via <see cref="SegmentBufferReader.Write"/>,
/// while this method (running on a background thread) deserializes incrementally,
/// blocking on <see cref="System.Threading.ManualResetEventSlim"/> when data is exhausted.
/// </summary> /// </summary>
public static T? Deserialize<T>(System.IO.Pipelines.PipeReader pipeReader, AcBinarySerializerOptions options) public static object? Deserialize(SegmentBufferReader reader, Type targetType, AcBinarySerializerOptions options)
=> DeserializeSequence<T, PipeReaderBinaryInput>(new PipeReaderBinaryInput(pipeReader), typeof(T), options); => DeserializeSequence<SegmentBufferReaderInput>(new SegmentBufferReaderInput(reader), targetType, options);
/// <summary>
/// Deserialize from PipeReader to specified type with segment streaming.
/// </summary>
public static object? Deserialize(System.IO.Pipelines.PipeReader pipeReader, Type targetType, AcBinarySerializerOptions options)
=> DeserializeSequence<PipeReaderBinaryInput>(new PipeReaderBinaryInput(pipeReader), targetType, options);
/// <summary> /// <summary>
/// Internal: Deserialize with any TInput (multi-segment or other future input types). /// Internal: Deserialize with any TInput (multi-segment or other future input types).

View File

@ -1,236 +0,0 @@
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Binary input that reads from a PipeReader, requesting more data when the current buffer is exhausted.
///
/// Mirrors SequenceBinaryInput's segment iteration and cross-boundary handling, but instead of
/// reading from a fixed ReadOnlySequence, it calls PipeReader.ReadAsync() to get more data
/// as chunks arrive from the network.
///
/// When the writer flushes per chunk (AsyncPipeWriterOutput), ReadAsync() returns as soon as
/// data is available — typically completing synchronously. This enables pipeline parallelism:
/// serialization, network transfer, and deserialization overlap.
/// </summary>
public struct PipeReaderBinaryInput : IBinaryInputBase
{
private readonly PipeReader _pipeReader;
private ReadOnlySequence<byte> _currentBuffer;
private SequencePosition _nextSegmentPosition;
private SequencePosition _consumedUpTo;
private bool _pipeCompleted;
private bool _hasActiveRead;
// Cross-boundary scratch — same pattern as SequenceBinaryInput
private byte[]? _scratchBuffer;
private bool _afterCrossBoundary;
private byte[]? _savedBuffer;
private int _savedPosition;
private int _savedBufferLength;
/// <summary>
/// Synchronously gets the result of a PipeReader.ReadAsync ValueTask.
/// Fast-path: if already completed (data in pipe), returns directly without Task allocation.
/// Slow-path: converts to Task for proper blocking when waiting for writer.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ReadResult SyncReadResult(ValueTask<ReadResult> vt)
=> vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
public PipeReaderBinaryInput(PipeReader pipeReader)
{
_pipeReader = pipeReader;
_currentBuffer = default;
_nextSegmentPosition = default;
_consumedUpTo = default;
_pipeCompleted = false;
_hasActiveRead = false;
_scratchBuffer = null;
_afterCrossBoundary = false;
_savedBuffer = null;
_savedPosition = 0;
_savedBufferLength = 0;
}
/// <summary>
/// Reads the first data from the PipeReader and provides the first segment's buffer.
/// </summary>
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{
var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0)
throw new AcBinaryDeserializationException("Empty pipe — no data to read.");
_consumedUpTo = _nextSegmentPosition; // Mark first segment as consumed (same as TryLoadNextSegmentFromBuffer)
ExtractArray(memory, out buffer, out position, out bufferLength);
}
/// <summary>
/// Advances to the next segment. If the current ReadResult buffer is exhausted,
/// calls PipeReader.ReadAsync() to get more data from the pipe.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
// After cross-boundary scratch read: restore to the last touched segment
if (_afterCrossBoundary)
{
_afterCrossBoundary = false;
buffer = _savedBuffer!;
position = _savedPosition;
bufferLength = _savedBufferLength;
if (bufferLength - position >= needed)
return true;
}
var remaining = bufferLength - position;
if (remaining > 0 && remaining < needed)
return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
// Try next segment in current buffer
if (TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength))
{
remaining = bufferLength - position;
if (remaining >= needed) return true;
return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
}
// Current buffer exhausted — request more data from PipeReader
if (_pipeCompleted)
return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
_hasActiveRead = false;
var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
if (!TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength))
return false;
remaining = bufferLength - position;
if (remaining >= needed) return true;
return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining);
}
/// <summary>
/// Returns the scratch buffer and signals the PipeReader that all data has been consumed.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Release()
{
if (_scratchBuffer != null)
{
ArrayPool<byte>.Shared.Return(_scratchBuffer);
_scratchBuffer = null;
}
if (_hasActiveRead)
{
_pipeReader.AdvanceTo(_currentBuffer.End);
_hasActiveRead = false;
}
}
private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength)
{
if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0)
return false;
ExtractArray(memory, out buffer, out position, out bufferLength);
_consumedUpTo = _nextSegmentPosition;
return true;
}
private bool TryReadCrossBoundary(ref byte[] buffer, ref int position, ref int bufferLength, int needed, int remaining)
{
if (_scratchBuffer == null || _scratchBuffer.Length < needed)
{
if (_scratchBuffer != null)
ArrayPool<byte>.Shared.Return(_scratchBuffer);
_scratchBuffer = ArrayPool<byte>.Shared.Rent(needed);
}
// 1) Copy tail of current segment
Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining);
var filled = remaining;
// 2) Copy from subsequent segments (may span multiple segments or pipe reads)
while (filled < needed)
{
// Try next segment in current buffer
if (_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) && memory.Length > 0)
{
ExtractArray(memory, out var segArray, out var segOffset, out var segBufferLength);
_consumedUpTo = _nextSegmentPosition;
var segCount = segBufferLength - segOffset;
var take = Math.Min(needed - filled, segCount);
Buffer.BlockCopy(segArray, segOffset, _scratchBuffer, filled, take);
filled += take;
_savedBuffer = segArray;
_savedPosition = segOffset + take;
_savedBufferLength = segBufferLength;
continue;
}
// Current buffer exhausted — read more from pipe
if (_pipeCompleted)
return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
_hasActiveRead = false;
var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
if (_currentBuffer.IsEmpty && _pipeCompleted)
return false;
}
buffer = _scratchBuffer;
position = 0;
bufferLength = filled;
_afterCrossBoundary = true;
return true;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ExtractArray(ReadOnlyMemory<byte> memory, out byte[] buffer, out int position, out int bufferLength)
{
if (MemoryMarshal.TryGetArray(memory, out var segment))
{
buffer = segment.Array!;
position = segment.Offset;
bufferLength = segment.Offset + segment.Count;
}
else
{
var temp = new byte[memory.Length];
memory.Span.CopyTo(temp);
buffer = temp;
position = 0;
bufferLength = temp.Length;
}
}
}

View File

@ -0,0 +1,226 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Thread-safe, single-producer/single-consumer byte buffer for chunked streaming deserialization.
///
/// Replaces <see cref="System.IO.Pipelines.Pipe"/> for the AsyncSegment read path:
/// the main thread writes incoming chunk data via <see cref="Write"/>, while a background
/// deserialization thread reads through <see cref="SegmentBufferReaderInput"/> which blocks
/// on <see cref="ManualResetEventSlim"/> when data is exhausted.
///
/// Backed by a single contiguous <c>byte[]</c> from <see cref="ArrayPool{T}"/>.
/// Positions reset to 0 when the consumer catches up (zero-cost reuse).
/// Grow is the absolute last resort (single Rent, practically never happens).
///
/// Thread-safety:
/// <list type="bullet">
/// <item><c>_writePos</c>: written by producer (Volatile.Write), read by consumer (Volatile.Read).</item>
/// <item><c>_readPos</c>: written by consumer (Volatile.Write), read by producer (Volatile.Read).</item>
/// <item>Reset-to-0 happens in <see cref="Write"/> only when <c>_readPos == _writePos</c>
/// (consumer is blocked in TryAdvanceSegment, not reading the buffer).</item>
/// <item>Grow happens in <see cref="Write"/> only when reset is insufficient
/// (consumer is behind). Old buffer kept for consumer's local reference;
/// <see cref="SegmentBufferReaderInput.TryAdvanceSegment"/> picks up new buffer.</item>
/// </list>
/// </summary>
public sealed class SegmentBufferReader : IDisposable
{
private byte[] _buffer;
private int _writePos;
private int _readPos; // consumer reports consumed position here
private bool _completed;
private readonly ManualResetEventSlim _dataAvailable;
private readonly ILogger? _logger;
// After grow: ALL old buffers are kept alive until Dispose.
// Cannot return them to the pool mid-operation because the consumer thread
// may hold a local reference to any of them (its local 'buffer' variable is
// only refreshed inside TryAdvanceSegment — and the consumer may lag multiple grows behind).
private byte[][]? _oldBuffers;
private int _oldBufferCount;
/// <summary>
/// Creates a new SegmentBufferReader with the specified initial capacity.
/// Typical value: <c>chunkSize * 2</c> (e.g. 8192 for 4096-byte chunks).
/// </summary>
/// <param name="initialCapacity">Initial buffer size. Rounded up by ArrayPool.</param>
/// <param name="logger">Optional logger for diagnostic output (Debug level). Only emits in DEBUG builds.</param>
public SegmentBufferReader(int initialCapacity, ILogger? logger = null)
{
if (initialCapacity <= 0)
throw new ArgumentOutOfRangeException(nameof(initialCapacity));
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_dataAvailable = new ManualResetEventSlim(false);
_logger = logger;
}
// --- Producer API (main thread) ---
/// <summary>
/// Appends chunk data to the buffer.
/// Resets positions to 0 when consumer has caught up (zero-cost).
/// Grows as last resort if consumer is behind and buffer is full.
/// Signals the consumer thread that new data is available.
/// </summary>
public void Write(ReadOnlySpan<byte> data)
{
if (data.IsEmpty) return;
// If consumer consumed everything → reset positions to 0 (zero-cost reuse)
var rp = Volatile.Read(ref _readPos);
if (rp > 0 && rp == _writePos)
{
DebugLog("Write reset positions rp={Rp} wp={Wp} → 0", rp, _writePos);
_writePos = 0;
Volatile.Write(ref _readPos, 0);
}
// Grow if buffer can't fit the new data (rare — consumer typically keeps pace)
if (_writePos + data.Length > _buffer.Length)
{
DebugLog("Write grow required wp={Wp} dataLen={DataLen} bufLen={BufLen}",
_writePos, data.Length, _buffer.Length);
Grow(_writePos + data.Length);
}
data.CopyTo(_buffer.AsSpan(_writePos));
var newWritePos = _writePos + data.Length;
Volatile.Write(ref _writePos, newWritePos);
_dataAvailable.Set();
DebugLog("Write dataLen={DataLen} newWritePos={NewWritePos} readPos={ReadPos}",
data.Length, newWritePos, Volatile.Read(ref _readPos));
}
/// <summary>
/// Signals that no more data will be written (CHUNK_END received).
/// The consumer's <see cref="SegmentBufferReaderInput.TryAdvanceSegment"/> will return false
/// once all buffered data is consumed.
/// </summary>
public void Complete()
{
Volatile.Write(ref _completed, true);
_dataAvailable.Set();
DebugLog("Complete writePos={Wp} readPos={Rp}",
Volatile.Read(ref _writePos), Volatile.Read(ref _readPos));
}
// --- Consumer API (deser thread, used by SegmentBufferReaderInput) ---
/// <summary>Current buffer array. May change after grow — consumer must re-read in TryAdvanceSegment.</summary>
internal byte[] Buffer => _buffer;
/// <summary>Current write position. All bytes in [ReadPos..WritePos) are valid.</summary>
internal int WritePos => Volatile.Read(ref _writePos);
/// <summary>Consumer's last reported read position.</summary>
internal int ReadPos => Volatile.Read(ref _readPos);
/// <summary>True after <see cref="Complete"/> is called.</summary>
internal bool IsCompleted => Volatile.Read(ref _completed);
/// <summary>
/// Called by consumer to report how far it has read.
/// Enables the producer to reset positions to 0 when everything is consumed.
/// </summary>
internal void SetReadPos(int position) => Volatile.Write(ref _readPos, position);
/// <summary>Blocks until new data is written or <see cref="Complete"/> is called.</summary>
internal void WaitForData() => _dataAvailable.Wait();
/// <summary>
/// Resets the signal for the double-check pattern:
/// <c>ResetSignal() → check condition → if false, WaitForData()</c>.
/// </summary>
internal void ResetSignal() => _dataAvailable.Reset();
// --- Lifecycle ---
public void Dispose()
{
// Return all old buffers accumulated from grows
if (_oldBuffers != null)
{
for (var i = 0; i < _oldBufferCount; i++)
{
ArrayPool<byte>.Shared.Return(_oldBuffers[i]);
_oldBuffers[i] = null!;
}
_oldBuffers = null;
_oldBufferCount = 0;
}
// Return current buffer
if (_buffer != null!)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null!;
}
_dataAvailable.Dispose();
}
// --- Internal ---
private void Grow(int requiredCapacity)
{
var newSize = Math.Max(_buffer.Length * 2, requiredCapacity);
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
System.Buffer.BlockCopy(_buffer, 0, newBuffer, 0, _writePos);
// Keep the current buffer alive — consumer's local 'buffer' variable may still reference it
// (consumer may lag multiple grows behind before calling TryAdvanceSegment).
// Returning old buffers to the pool mid-operation would cause use-after-free
// if another pool user overwrites them while the consumer is still reading.
if (_oldBuffers == null)
_oldBuffers = new byte[4][];
else if (_oldBufferCount == _oldBuffers.Length)
Array.Resize(ref _oldBuffers, _oldBuffers.Length * 2);
_oldBuffers[_oldBufferCount++] = _buffer;
_buffer = newBuffer;
}
// --- Diagnostic logging (DEBUG builds only — zero cost in RELEASE) ---
/// <summary>
/// Emits a debug log if the logger is attached and Debug level is enabled.
/// Compiled out entirely in RELEASE builds via <see cref="ConditionalAttribute"/>.
/// </summary>
[Conditional("DEBUG")]
internal void DebugLog(string message)
{
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("SegmentBufferReader " + message);
}
[Conditional("DEBUG")]
private void DebugLog(string template, object? arg0)
{
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("SegmentBufferReader " + template, arg0);
}
[Conditional("DEBUG")]
private void DebugLog(string template, object? arg0, object? arg1)
{
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("SegmentBufferReader " + template, arg0, arg1);
}
[Conditional("DEBUG")]
private void DebugLog(string template, object? arg0, object? arg1, object? arg2)
{
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("SegmentBufferReader " + template, arg0, arg1, arg2);
}
}

View File

@ -0,0 +1,120 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Binary input that reads from a <see cref="SegmentBufferReader"/> for chunked streaming deserialization.
///
/// Replaces <c>PipeReaderBinaryInput</c>: instead of blocking on <c>PipeReader.ReadAsync()</c>,
/// blocks on <see cref="ManualResetEventSlim"/> when data is exhausted. Much simpler because
/// the buffer is a single contiguous <c>byte[]</c> — no multi-segment iteration, no cross-boundary
/// scratch buffers.
///
/// The deserialization context's hot path reads directly from the buffer array using local
/// <c>buffer</c>/<c>position</c>/<c>bufferLength</c> variables. <see cref="TryAdvanceSegment"/>
/// is only called when <c>position >= bufferLength</c> (cold path), at which point it reports
/// the consumed position via <see cref="SegmentBufferReader.SetReadPos"/>, then either
/// provides more data or blocks until data arrives.
///
/// Position reset: when the producer detects <c>readPos == writePos</c> (all consumed),
/// it resets both to 0. After waking from Wait, this input re-reads the adjusted positions.
/// </summary>
public struct SegmentBufferReaderInput : IBinaryInputBase
{
private readonly SegmentBufferReader _reader;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public SegmentBufferReaderInput(SegmentBufferReader reader)
{
_reader = reader;
}
/// <summary>
/// Provides the initial buffer state. Called once before deserialization begins.
/// Task.Run starts after the first Write() (lazy start in TryParseChunkData),
/// so data is already available — no wait needed.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{
buffer = _reader.Buffer;
position = 0;
bufferLength = _reader.WritePos;
DebugLog("Input.Initialize bufferLength=" + bufferLength);
}
/// <summary>
/// Called when the deserialization context needs more bytes than currently available.
/// Reports consumed position to the producer, then blocks via <see cref="ManualResetEventSlim"/>
/// until enough data arrives or completion is signaled.
///
/// Uses the double-check pattern to avoid missed signals:
/// <c>Reset() → check → if still not enough, Wait()</c>.
///
/// No cross-boundary handling needed — the buffer is a single contiguous <c>byte[]</c>.
/// After grow, re-reads <c>_reader.Buffer</c> to get the new (larger) array.
/// After position reset (readPos/writePos set to 0 by producer), re-reads adjusted positions.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
DebugLog("Input.TryAdvanceSegment enter position=" + position + " bufferLength=" + bufferLength + " needed=" + needed);
// Report how far we've consumed — enables producer to reset positions to 0
_reader.SetReadPos(position);
while (true)
{
// Re-read positions (may have been reset to 0 by producer)
int rp = _reader.ReadPos;
int wp = _reader.WritePos;
if (wp - rp >= needed)
{
buffer = _reader.Buffer; // may be new array after grow
position = rp; // may be 0 after reset
bufferLength = wp;
DebugLog("Input.TryAdvanceSegment return true (data available) position=" + position + " bufferLength=" + bufferLength);
return true;
}
if (_reader.IsCompleted)
{
// No more data will arrive. Return whatever is left.
if (wp > rp)
{
buffer = _reader.Buffer;
position = rp;
bufferLength = wp;
DebugLog("Input.TryAdvanceSegment return true (completed, partial) position=" + position + " bufferLength=" + bufferLength);
return true;
}
DebugLog("Input.TryAdvanceSegment return false (completed, empty)");
return false; // end of input
}
// Double-check pattern: Reset → verify → Wait
_reader.ResetSignal();
rp = _reader.ReadPos;
wp = _reader.WritePos;
if (wp - rp >= needed || _reader.IsCompleted)
continue; // re-check from top
DebugLog("Input.TryAdvanceSegment waiting (wp=" + wp + " rp=" + rp + " needed=" + needed + ")");
_reader.WaitForData(); // ManualResetEventSlim.Wait()
DebugLog("Input.TryAdvanceSegment woke up");
}
}
[Conditional("DEBUG")]
private void DebugLog(string message) => _reader.DebugLog(message);
/// <summary>
/// No-op. Buffer lifecycle is managed by <see cref="SegmentBufferReader.Dispose"/>.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Release() { }
}

View File

@ -0,0 +1,171 @@
using System.Buffers;
using System.IO.Pipelines;
using System.Threading;
namespace AyCode.Services.Server.Tests.SignalRs;
/// <summary>
/// Pipe-based test transport for AsyncSegment protocol path.
///
/// Unlike <see cref="SlabTransportWriter"/> (IBufferWriter only), this exposes a real
/// <see cref="PipeWriter"/>, so <c>AcBinaryHubProtocol.WriteMessage</c> can enter
/// AsyncSegment chunked mode (<c>output is PipeWriter</c> check).
///
/// The internal <see cref="Pipe"/> uses a slab-like memory pool with fixed segment size,
/// random offsets and size jitter to better simulate transport behavior.
/// </summary>
internal sealed class AsyncSegmentPipeTransportWriter : IDisposable
{
private readonly Pipe _pipe;
private bool _disposed;
private bool _writerCompleted;
public AsyncSegmentPipeTransportWriter(int segmentSize = 256, int seed = 42)
{
if (segmentSize <= 0)
throw new ArgumentOutOfRangeException(nameof(segmentSize));
_pipe = new Pipe(new PipeOptions(
pool: new SlabSimulatingPool(segmentSize, seed),
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 0,
resumeWriterThreshold: 0,
minimumSegmentSize: segmentSize,
useSynchronizationContext: false));
}
/// <summary>
/// Gets the PipeWriter that must be passed to protocol <c>WriteMessage</c>
/// to activate AsyncSegment chunked write path.
/// </summary>
public PipeWriter Writer => _pipe.Writer;
/// <summary>
/// Gets the paired PipeReader for test-side inspection and parsing.
/// </summary>
public PipeReader Reader => _pipe.Reader;
/// <summary>
/// Completes only the writer side of the internal pipe.
/// Reader remains open so tests can continue draining buffered data.
/// </summary>
public void CompleteWriter()
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_writerCompleted)
return;
_writerCompleted = true;
_pipe.Writer.Complete();
}
/// <summary>
/// Drains all currently available bytes from the reader into a contiguous array.
/// Does not require completing the writer.
/// </summary>
public byte[] DrainAvailableBytes()
{
ObjectDisposedException.ThrowIf(_disposed, this);
var output = new ArrayBufferWriter<byte>();
while (_pipe.Reader.TryRead(out var result))
{
var buffer = result.Buffer;
foreach (var segment in buffer)
output.Write(segment.Span);
_pipe.Reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
return output.WrittenSpan.ToArray();
}
/// <summary>
/// Asynchronously drains all data until the writer is completed and the pipe is exhausted.
/// </summary>
public async Task<byte[]> DrainAllAsync(CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_disposed, this);
var output = new ArrayBufferWriter<byte>();
while (true)
{
var result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var buffer = result.Buffer;
foreach (var segment in buffer)
output.Write(segment.Span);
_pipe.Reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
return output.WrittenSpan.ToArray();
}
/// <summary>
/// Completes writer and reader sides of the internal pipe.
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
if (!_writerCompleted)
{
_writerCompleted = true;
_pipe.Writer.Complete();
}
_pipe.Reader.Complete();
}
private sealed class SlabSimulatingPool : MemoryPool<byte>
{
private readonly int _segmentSize;
private readonly Random _rng;
public SlabSimulatingPool(int segmentSize, int seed)
{
_segmentSize = segmentSize;
_rng = new Random(seed);
}
public override int MaxBufferSize => _segmentSize;
public override IMemoryOwner<byte> Rent(int minBufferSize = -1)
{
var requested = minBufferSize > 0 ? minBufferSize : _segmentSize;
var size = Math.Max(requested, _segmentSize);
var offset = _rng.Next(0, Math.Max(1, _segmentSize));
var jitter = _rng.Next(-_segmentSize / 4, _segmentSize / 4 + 1);
var actualSize = Math.Max(16, size + jitter);
var array = new byte[actualSize + offset];
return new Owner(array, offset, actualSize);
}
protected override void Dispose(bool disposing)
{
}
private sealed class Owner(byte[] array, int offset, int length) : IMemoryOwner<byte>
{
public Memory<byte> Memory { get; } = array.AsMemory(offset, length);
public void Dispose()
{
}
}
}
}

View File

@ -1,6 +1,7 @@
using System.Buffers; using System.Buffers;
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines; using System.IO.Pipelines;
using AyCode.Core.Serializers.Binaries;
using AyCode.Services.SignalRs; using AyCode.Services.SignalRs;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.AspNetCore.SignalR.Protocol;
@ -25,9 +26,12 @@ namespace AyCode.Services.Server.Tests.SignalRs;
internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
{ {
private const int SegmentSize = 256; private const int SegmentSize = 256;
private readonly BinaryProtocolMode _mode;
public TestMultiSegmentProtocol() public TestMultiSegmentProtocol(BinaryProtocolMode mode = BinaryProtocolMode.Bytes)
: base(AcBinarySerializerOptions.Default, mode)
{ {
_mode = mode;
Options.BufferWriterChunkSize = SegmentSize; Options.BufferWriterChunkSize = SegmentSize;
} }
@ -38,6 +42,9 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
/// </summary> /// </summary>
public ReadOnlyMemory<byte> GetMessageBytesMultiSegment(HubMessage message) public ReadOnlyMemory<byte> GetMessageBytesMultiSegment(HubMessage message)
{ {
if (_mode == BinaryProtocolMode.AsyncSegment)
return GetMessageBytesAsyncSegment(message);
// ── Transport-double path (production simulation) ────────────────── // ── Transport-double path (production simulation) ──────────────────
var transport = new SlabTransportWriter(SegmentSize); var transport = new SlabTransportWriter(SegmentSize);
WriteMessage(message, transport); WriteMessage(message, transport);
@ -58,6 +65,18 @@ internal class TestMultiSegmentProtocol : AyCodeBinaryHubProtocol
return transportBytes; return transportBytes;
} }
/// <summary>
/// AsyncSegment write side: uses a real PipeWriter transport so chunked protocol path activates.
/// </summary>
private ReadOnlyMemory<byte> GetMessageBytesAsyncSegment(HubMessage message)
{
using var transport = new AsyncSegmentPipeTransportWriter(SegmentSize);
WriteMessage(message, transport.Writer);
transport.CompleteWriter();
var bytes = transport.DrainAllAsync().GetAwaiter().GetResult();
return bytes;
}
/// <summary> /// <summary>
/// Read side: fill Pipe with 256-byte slab segments → multi-segment ReadOnlySequence. /// Read side: fill Pipe with 256-byte slab segments → multi-segment ReadOnlySequence.
/// Same as production Kestrel PipeReader delivering slab-sized segments. /// Same as production Kestrel PipeReader delivering slab-sized segments.

View File

@ -19,18 +19,19 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ
{ {
private HubConnectionState _connectionState = HubConnectionState.Connected; private HubConnectionState _connectionState = HubConnectionState.Connected;
private readonly TestableSignalRHub2 _signalRHub; private readonly TestableSignalRHub2 _signalRHub;
private readonly TestMultiSegmentProtocol _protocol = new(); private readonly TestMultiSegmentProtocol _protocol;
private readonly TestInvocationBinder _binder = new(); private readonly TestInvocationBinder _binder = new();
/// <summary> /// <summary>
/// Testable SignalR client that allows testing without real HubConnection. /// Testable SignalR client that allows testing without real HubConnection.
/// </summary> /// </summary>
public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger) : base(logger) public TestableSignalRClient2(TestableSignalRHub2 signalRHub, TestLogger logger, BinaryProtocolMode mode = BinaryProtocolMode.Bytes) : base(logger)
{ {
MsDelay = 0; MsDelay = 0;
MsFirstDelay = 0; MsFirstDelay = 0;
_signalRHub = signalRHub; _signalRHub = signalRHub;
_protocol = new TestMultiSegmentProtocol(mode);
} }
#region Override virtual methods for testing #region Override virtual methods for testing

View File

@ -22,7 +22,7 @@ namespace AyCode.Services.Server.Tests.SignalRs;
public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogger> public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogger>
{ {
private IAcSignalRHubItemServer _callerClient; private IAcSignalRHubItemServer _callerClient;
private readonly TestMultiSegmentProtocol _protocol = new(); private readonly TestMultiSegmentProtocol _protocol;
private readonly TestInvocationBinder _binder = new(); private readonly TestInvocationBinder _binder = new();
#region Test Configuration #region Test Configuration
@ -49,14 +49,16 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogg
#endregion #endregion
public TestableSignalRHub2() public TestableSignalRHub2(BinaryProtocolMode mode = BinaryProtocolMode.Bytes)
: base(new ConfigurationBuilder().Build(), new TestLogger()) : base(new ConfigurationBuilder().Build(), new TestLogger())
{ {
_protocol = new TestMultiSegmentProtocol(mode);
} }
public TestableSignalRHub2(IConfiguration configuration, TestLogger logger) public TestableSignalRHub2(IConfiguration configuration, TestLogger logger, BinaryProtocolMode mode = BinaryProtocolMode.Bytes)
: base(configuration, logger) : base(configuration, logger)
{ {
_protocol = new TestMultiSegmentProtocol(mode);
} }
#region Public Test Entry Points #region Public Test Entry Points

View File

@ -60,8 +60,11 @@ public class AcBinaryHubProtocol : IHubProtocol
protected readonly BinaryProtocolMode _protocolMode; protected readonly BinaryProtocolMode _protocolMode;
protected readonly ILogger? _logger; protected readonly ILogger? _logger;
/// <summary>Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).</summary> /// <summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState>? _chunkStates; /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
/// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server.
/// </summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState> _chunkStates;
private sealed class AsyncChunkState private sealed class AsyncChunkState
{ {
@ -69,8 +72,16 @@ public class AcBinaryHubProtocol : IHubProtocol
public object?[] Args = null!; public object?[] Args = null!;
public int StreamedArgIndex; public int StreamedArgIndex;
public Type StreamedArgType = null!; public Type StreamedArgType = null!;
public Pipe InternalPipe = null!; public SegmentBufferReader Buffer = null!;
public Task<object?>? DeserTask; public Task<object?>? DeserTask;
/// <summary>
/// Total bytes of chunk frame data already consumed from the input stream
/// (including [201][UINT16] framing headers + data bytes).
/// Used to skip already-processed chunks when SignalR re-presents the buffer
/// after a false-returning TryParseMessage call.
/// </summary>
public int ChunkFrameBytesConsumed;
} }
public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
@ -81,9 +92,7 @@ public class AcBinaryHubProtocol : IHubProtocol
_options.BufferWriterChunkSize = 4096; _options.BufferWriterChunkSize = 4096;
_protocolMode = protocolMode; _protocolMode = protocolMode;
_logger = logger; _logger = logger;
_chunkStates = protocolMode == BinaryProtocolMode.AsyncSegment _chunkStates = new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>();
? new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>()
: null;
} }
/// <summary> /// <summary>
@ -410,10 +419,36 @@ public class AcBinaryHubProtocol : IHubProtocol
{ {
message = null; message = null;
// AsyncSegment chunk mode: non-standard framing (no INT32 length prefix) // AsyncSegment chunk mode
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState)) if (_chunkStates.TryGetValue(binder, out var chunkState))
{ {
_logger?.LogTrace("TryParseMessage chunk mode active, inputLength={InputLength}", input.Length); // Guard against buffer re-presentation: if SignalR re-submitted the same buffer
// (because our previous fallthrough returned false without advancing),
// the buffer may still contain:
// 1. The already-processed CHUNK_START frame
// 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously)
// Skip both to avoid duplicate writes to state.Buffer.
if (TrySkipRepresentedChunkStart(ref input))
{
_logger?.LogInformation("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length);
// Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START)
if (chunkState.ChunkFrameBytesConsumed > 0)
{
if (input.Length < chunkState.ChunkFrameBytesConsumed)
{
_logger?.LogWarning("TryParseMessage re-presentation inconsistency: expected >= {Expected} already-consumed bytes but only {Actual} in buffer",
chunkState.ChunkFrameBytesConsumed, input.Length);
return false;
}
input = input.Slice(chunkState.ChunkFrameBytesConsumed);
_logger?.LogInformation("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}",
chunkState.ChunkFrameBytesConsumed, input.Length);
}
}
_logger?.LogInformation("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}",
binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0);
return TryParseChunkData(ref input, chunkState, binder, out message); return TryParseChunkData(ref input, chunkState, binder, out message);
} }
@ -429,27 +464,64 @@ public class AcBinaryHubProtocol : IHubProtocol
message = ParseMessage(ref reader, payloadLength, binder); message = ParseMessage(ref reader, payloadLength, binder);
input = input.Slice(LengthPrefixSize + payloadLength);
if (message != null) if (message != null)
{ {
input = input.Slice(LengthPrefixSize + payloadLength);
if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
return true; return true;
} }
// CHUNK_START consumed but no message yet — chunk mode just activated. // CHUNK_START consumed but no complete HubMessage yet (chunk mode just activated).
// Must try chunk data immediately; returning false here would cause SignalR // Try to process any remaining chunk data already in the buffer.
// to call AdvanceTo(examined=end) and wait for new data, even though if (_chunkStates.TryGetValue(binder, out chunkState))
// CHUNK_DATA/CHUNK_END may already be in the remaining buffer.
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState))
{ {
_logger?.LogDebug("TryParseMessage CHUNK_START activated, fallthrough to TryParseChunkData remainingInput={RemainingInput}", input.Length); var afterChunkStart = input.Slice(LengthPrefixSize + payloadLength);
return TryParseChunkData(ref input, chunkState, binder, out message); if (TryParseChunkData(ref afterChunkStart, chunkState, binder, out message))
{
// Full chunked message processed in one call
input = afterChunkStart;
_logger?.LogInformation("TryParseMessage CHUNK_START + chunk data processed in single call");
return true;
}
// IMPORTANT: do NOT advance input when returning false.
// SignalR's contract is "advance only on success". If we advance here,
// the buffer state becomes inconsistent on re-submission.
// On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode
// block above handles that via TrySkipRepresentedChunkStart.
_logger?.LogInformation("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)");
return false;
} }
return false; return false;
} }
/// <summary>
/// Detects if the buffer starts with a re-presented CHUNK_START frame pattern
/// ([INT32 length][CHUNK_START marker]). If so, advances <paramref name="input"/>
/// past the entire frame and returns true.
///
/// This guards against the case where SignalR's buffer management re-presents
/// bytes we logically consumed during a previous false-returning TryParseMessage call.
/// </summary>
private static bool TrySkipRepresentedChunkStart(ref ReadOnlySequence<byte> input)
{
if (input.Length < LengthPrefixSize + 1) return false;
Span<byte> header = stackalloc byte[LengthPrefixSize + 1];
input.Slice(0, LengthPrefixSize + 1).CopyTo(header);
int maybeLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32LittleEndian(header.Slice(0, LengthPrefixSize));
byte maybeMarker = header[LengthPrefixSize];
if (maybeMarker != MsgAsyncChunkStart) return false;
if (maybeLen <= 0 || input.Length < LengthPrefixSize + maybeLen) return false;
input = input.Slice(LengthPrefixSize + maybeLen);
return true;
}
private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder) private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder)
{ {
if (payloadLength == 0) if (payloadLength == 0)
@ -637,51 +709,64 @@ public class AcBinaryHubProtocol : IHubProtocol
_logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
// Write chunk data to internal pipe for background deserialization // Write chunk data to SegmentBufferReader for background deserialization
if (chunkDataSize > 0) if (chunkDataSize > 0)
{ {
var dataSlice = input.Slice(3, chunkDataSize); var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice) foreach (var segment in dataSlice)
state.InternalPipe.Writer.Write(segment.Span); state.Buffer.Write(segment.Span);
SyncFlush(state.InternalPipe.Writer.FlushAsync());
} }
// Lazy start: begin background deserialization after first chunk is in the pipe. // Lazy start: begin background deserialization after first chunk is written.
// Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available. // SegmentBufferReaderInput.Initialize reads the already-written data immediately.
if (state.DeserTask == null) if (state.DeserTask == null)
{ {
_logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
var pipeReader = state.InternalPipe.Reader; var reader = state.Buffer;
var type = state.StreamedArgType; var type = state.StreamedArgType;
var opts = _options; var opts = _options;
state.DeserTask = Task.Run(() => (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts));
} }
input = input.Slice(totalNeeded); input = input.Slice(totalNeeded);
state.ChunkFrameBytesConsumed += totalNeeded;
continue; // try next chunk immediately continue; // try next chunk immediately
} }
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{ {
_logger?.LogDebug("TryParseChunkData [202] CHUNK_END — completing pipe"); _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion");
// Signal end of data → background deser task completes // Signal end of data → background deser task completes
state.InternalPipe.Writer.Complete(); state.Buffer.Complete();
object? deserializedArg = null; object? deserializedArg = null;
if (state.DeserTask != null) try
{ {
deserializedArg = state.DeserTask.GetAwaiter().GetResult(); if (state.DeserTask != null)
state.InternalPipe.Reader.Complete(); {
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
if (_logger!.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "TryParseChunkData deserialization FAILED targetType={TargetType}", state.StreamedArgType.Name);
throw;
}
finally
{
_logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove");
state.Buffer.Dispose();
_chunkStates.Remove(binder);
} }
// Fill the placeholder in the stored message's args // Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg); FillStreamedArg(state, deserializedArg);
_chunkStates!.Remove(binder);
input = input.Slice(1); // consume the single [202] byte input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage; message = state.PartialMessage;
@ -690,7 +775,16 @@ public class AcBinaryHubProtocol : IHubProtocol
} }
// Unknown byte in chunk mode — break out (shouldn't happen) // Unknown byte in chunk mode — break out (shouldn't happen)
_logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking", firstByte); _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " +
"binderHash={BinderHash} inputLength={InputLength} " +
"state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}",
firstByte,
binder.GetHashCode(),
input.Length,
state.StreamedArgType.Name,
state.DeserTask?.Status.ToString() ?? "null",
state.Buffer.WritePos,
state.Buffer.ReadPos);
break; break;
} }
@ -699,7 +793,7 @@ public class AcBinaryHubProtocol : IHubProtocol
/// <summary> /// <summary>
/// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg),
/// creates internal Pipe, starts background deserialization task, stores state. /// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk.
/// Returns null to signal "consumed bytes, no complete message yet". /// Returns null to signal "consumed bytes, no complete message yet".
/// </summary> /// </summary>
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder) private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
@ -733,11 +827,13 @@ public class AcBinaryHubProtocol : IHubProtocol
Args = args, Args = args,
StreamedArgIndex = streamedIndex, StreamedArgIndex = streamedIndex,
StreamedArgType = streamedType, StreamedArgType = streamedType,
InternalPipe = new Pipe() Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger)
// DeserTask started lazily in TryParseChunkData after first chunk is written // DeserTask started lazily in TryParseChunkData after first chunk is written
}; };
_chunkStates!.AddOrUpdate(binder, state); _chunkStates.AddOrUpdate(binder, state);
_logger?.LogInformation("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}",
binder.GetHashCode(), streamedType.Name);
return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData
} }