Chunked framing for AsyncSegment: zero-copy SignalR ser/deser

Implement self-describing chunked protocol ([201][UINT16][data], [202] end) for AsyncSegment mode, enabling true zero-copy, pipeline-parallel serialization/deserialization of large arguments in SignalR.
- AsyncPipeWriterOutput now reserves a 3-byte header per chunk and supports two backpressure modes.
- AcBinaryHubProtocol routes streamable arguments through WriteMessageChunked, with chunk accumulation and background deserialization on the receiver.
- Logging now uses ILogger; documentation and wire format details updated.
- Consumer code updated to use new mode and diagnostics.
- Improves throughput, memory usage, and maintainability for large payloads.
This commit is contained in:
Loretta 2026-04-11 10:35:03 +02:00
parent 83350e43f6
commit 82a407ff82
9 changed files with 570 additions and 102 deletions

View File

@ -423,24 +423,28 @@ public static partial class AcBinarySerializer
} }
/// <summary> /// <summary>
/// Serialize to PipeWriter with segment streaming (flush per chunk via AsyncPipeWriterOutput). /// Serialize to PipeWriter with chunked protocol framing via AsyncPipeWriterOutput.
/// Each chunk is flushed to the network as it fills, enabling pipeline parallelism. /// Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> and committed
/// Returns total bytes written. /// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single <c>[202]</c>
/// byte after to signal end-of-stream.
/// </summary> /// </summary>
public static int Serialize<T>(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) /// <returns>Total serialized data bytes (excluding framing overhead).</returns>
public static int Serialize<T>(
T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options,
bool waitForFlush = true)
{ {
if (value == null) if (value == null)
{ {
// Null: write directly, no chunking needed
var span = pipeWriter.GetSpan(1); var span = pipeWriter.GetSpan(1);
span[0] = BinaryTypeCode.Null; span[0] = BinaryTypeCode.Null;
pipeWriter.Advance(1); pipeWriter.Advance(1);
pipeWriter.FlushAsync().GetAwaiter().GetResult();
return 1; return 1;
} }
var runtimeType = value.GetType(); var runtimeType = value.GetType();
var context = BinarySerializationContextPool<AsyncPipeWriterOutput>.Get(options); var context = BinarySerializationContextPool<AsyncPipeWriterOutput>.Get(options);
context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize); context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush);
context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd);
try try

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Buffers; using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
@ -9,114 +10,115 @@ using AyCode.Core.Helpers;
namespace AyCode.Core.Serializers.Binaries; namespace AyCode.Core.Serializers.Binaries;
/// <summary> /// <summary>
/// Binary output that writes to a PipeWriter with per-chunk network flush. /// Binary output that writes to a PipeWriter with per-chunk network flush and self-describing framing.
/// ///
/// Identical to BufferWriterBinaryOutput except: Grow() calls PipeWriter.FlushAsync().Forget() /// Each chunk (including the last) is framed as <c>[201][UINT16 size][data]</c> with a 3-byte header
/// after committing each chunk, so data flows to the network as it's being serialized /// reserved at the start of each buffer. The serializer context writes into the space after the
/// rather than waiting for the full serialization to complete. /// reserved bytes; on Grow(), the header is patched and the full chunk is committed via Advance
/// and flushed to the network. Flush() does the same for the last (partial) chunk — zero-copy
/// for both intermediate and final chunks.
/// ///
/// Backpressure: stores the last FlushAsync ValueTask. If the previous flush hasn't completed /// The protocol layer writes a single <c>[202]</c> byte after all chunks to signal end-of-stream.
/// by the next Grow(), blocks until it does. This bounds memory to ~2 chunks.
/// ///
/// The first Grow() skips the flush to keep the length prefix span valid for patching. /// Backpressure modes (controlled by <c>waitForFlush</c> constructor parameter):
/// <list type="bullet">
/// <item><c>waitForFlush=true</c> (default): Grow() blocks if the previous FlushAsync hasn't completed.
/// Bounds memory to ~2 chunks in flight.</item>
/// <item><c>waitForFlush=false</c>: Grow() never blocks. Data accumulates in the PipeWriter's internal
/// buffer and is sent with the next completed flush. Maximum serialization throughput.</item>
/// </list>
///
/// Maximum chunk data size: 65535 bytes (UINT16 max).
/// </summary> /// </summary>
public struct AsyncPipeWriterOutput : IBinaryOutputBase public struct AsyncPipeWriterOutput : IBinaryOutputBase
{ {
/// <summary>MsgAsyncChunkData type marker (201).</summary>
private const byte ChunkDataMarker = 201;
/// <summary>Header size: 1 byte type + 2 bytes UINT16 size.</summary>
private const int HeaderSize = 3;
/// <summary>Maximum chunk data size (UINT16 max).</summary>
public const int MaxChunkSize = ushort.MaxValue;
private readonly PipeWriter _pipeWriter; private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize; private readonly int _chunkSize;
private readonly bool _waitForFlush;
private int _committedBytes; private int _committedBytes;
private int _currentChunkStart; private int _currentChunkStart;
private bool _ownedBuffer; private bool _ownedBuffer;
private ValueTask<FlushResult> _lastFlush; private ValueTask<FlushResult> _lastFlush;
private bool _firstGrow;
public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096) public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true)
{ {
if (chunkSize > MaxChunkSize)
throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize,
$"Chunk size cannot exceed {MaxChunkSize} (UINT16 max).");
_pipeWriter = pipeWriter; _pipeWriter = pipeWriter;
_chunkSize = chunkSize; _chunkSize = chunkSize;
_waitForFlush = waitForFlush;
_committedBytes = 0; _committedBytes = 0;
_ownedBuffer = false; _ownedBuffer = false;
_lastFlush = default; _lastFlush = default;
_firstGrow = true;
} }
/// <summary> /// <summary>
/// Provides the initial buffer from the PipeWriter. /// Provides the initial buffer from the PipeWriter with 3-byte header reservation.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferEnd) public void Initialize(out byte[] buffer, out int position, out int bufferEnd)
{ {
_committedBytes = 0; _committedBytes = 0;
_lastFlush = default; _lastFlush = default;
_firstGrow = true;
AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd);
_currentChunkStart = position; _currentChunkStart = position;
} }
/// <summary> /// <summary>
/// Called when the context's buffer is full. Commits current chunk to the PipeWriter, /// Called when the context's buffer is full. Patches the chunk header [201][UINT16 size],
/// fires a background flush (except on the first call — length prefix must stay valid), /// commits the chunk to the PipeWriter, and fires a background flush.
/// and acquires a new chunk.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{ {
// Backpressure: wait for previous flush if still in progress // Backpressure: wait for previous flush if still in progress
if (!_lastFlush.IsCompleted) if (_waitForFlush && !_lastFlush.IsCompleted)
_lastFlush.GetAwaiter().GetResult(); _lastFlush.GetAwaiter().GetResult();
// Commit bytes written in current chunk CommitCurrentChunk(buffer, position);
var bytesInChunk = position - _currentChunkStart;
if (bytesInChunk > 0)
{
if (_ownedBuffer)
FlushOwnedBuffer(buffer, bytesInChunk);
else
_pipeWriter.Advance(bytesInChunk);
_committedBytes += bytesInChunk;
}
// Fire-and-forget flush — EXCEPT first chunk (length prefix span must stay valid) // Fire-and-forget flush when previous is done
if (!_firstGrow) if (_lastFlush.IsCompleted)
{ {
_lastFlush = _pipeWriter.FlushAsync(); _lastFlush = _pipeWriter.FlushAsync();
_lastFlush.Forget(); _lastFlush.Forget();
} }
_firstGrow = false;
// Acquire new chunk // Acquire new chunk with header reservation
AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd); AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd);
_currentChunkStart = position; _currentChunkStart = position;
} }
/// <summary> /// <summary>
/// Returns total bytes written: committed + pending in current chunk. /// Returns total serialized data bytes (excluding framing overhead).
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetTotalPosition(int currentPosition) public int GetTotalPosition(int currentPosition)
=> _committedBytes + (currentPosition - _currentChunkStart); => _committedBytes + (currentPosition - _currentChunkStart);
/// <summary> /// <summary>
/// Commits final pending bytes and performs a synchronous flush. /// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header.
/// Must be called after all writes are complete. /// Zero-copy: patches the reserved header bytes and calls Advance — no data copying.
/// Does NOT flush to network — the protocol writes [202] and flushes after.
/// </summary> /// </summary>
public void Flush(byte[] buffer, int position) public void Flush(byte[] buffer, int position)
{ {
// Wait for any in-flight flush // Wait for any in-flight flush from previous Grow
if (!_lastFlush.IsCompleted) if (!_lastFlush.IsCompleted)
_lastFlush.GetAwaiter().GetResult(); _lastFlush.GetAwaiter().GetResult();
var bytesInChunk = position - _currentChunkStart; CommitCurrentChunk(buffer, position);
if (bytesInChunk > 0)
{
if (_ownedBuffer)
FlushOwnedBuffer(buffer, bytesInChunk);
else
_pipeWriter.Advance(bytesInChunk);
}
// Final synchronous flush — ensures all data reaches the network
_pipeWriter.FlushAsync().GetAwaiter().GetResult();
} }
/// <summary> /// <summary>
@ -124,35 +126,57 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
/// </summary> /// </summary>
public void Reset() { } public void Reset() { }
[MethodImpl(MethodImplOptions.NoInlining)] /// <summary>
private void FlushOwnedBuffer(byte[] buffer, int bytesInChunk) /// Patches [201][UINT16 dataBytes] into the reserved header and commits via Advance.
/// For owned buffers, copies to PipeWriter first.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CommitCurrentChunk(byte[] buffer, int position)
{ {
var span = _pipeWriter.GetSpan(bytesInChunk); var dataBytes = position - _currentChunkStart;
buffer.AsSpan(_currentChunkStart, bytesInChunk).CopyTo(span); if (dataBytes <= 0) return;
_pipeWriter.Advance(bytesInChunk);
var headerStart = _currentChunkStart - HeaderSize;
buffer[headerStart] = ChunkDataMarker;
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes);
if (_ownedBuffer)
FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes);
else
_pipeWriter.Advance(HeaderSize + dataBytes);
_committedBytes += dataBytes; // only count data bytes, not framing
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void FlushOwnedBuffer(byte[] buffer, int start, int length)
{
var span = _pipeWriter.GetSpan(length);
buffer.AsSpan(start, length).CopyTo(span);
_pipeWriter.Advance(length);
ArrayPool<byte>.Shared.Return(buffer); ArrayPool<byte>.Shared.Return(buffer);
_ownedBuffer = false; _ownedBuffer = false;
} }
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
{ {
var actualRequest = Math.Max(requestSize, _chunkSize); var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize);
var memory = _pipeWriter.GetMemory(actualRequest); var totalRequest = dataSize + HeaderSize;
var memory = _pipeWriter.GetMemory(totalRequest);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null) if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null)
{ {
buffer = segment.Array; buffer = segment.Array;
position = segment.Offset; position = segment.Offset + HeaderSize;
bufferEnd = segment.Offset + segment.Count; bufferEnd = segment.Offset + HeaderSize + dataSize;
_ownedBuffer = false; _ownedBuffer = false;
} }
else else
{ {
// Fallback for non-array-backed PipeWriter (e.g. Kestrel PinnedBlockMemoryPool) var owned = ArrayPool<byte>.Shared.Rent(totalRequest);
var owned = ArrayPool<byte>.Shared.Rent(actualRequest);
buffer = owned; buffer = owned;
position = 0; position = HeaderSize;
bufferEnd = owned.Length; bufferEnd = HeaderSize + dataSize;
_ownedBuffer = true; _ownedBuffer = true;
} }
} }

View File

@ -60,6 +60,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0)
throw new AcBinaryDeserializationException("Empty pipe — no data to read."); throw new AcBinaryDeserializationException("Empty pipe — no data to read.");
_consumedUpTo = _nextSegmentPosition; // Mark first segment as consumed (same as TryLoadNextSegmentFromBuffer)
ExtractArray(memory, out buffer, out position, out bufferLength); ExtractArray(memory, out buffer, out position, out bufferLength);
} }

View File

@ -103,27 +103,40 @@ void Release();
## AsyncPipeWriterOutput ## AsyncPipeWriterOutput
`struct AsyncPipeWriterOutput : IBinaryOutputBase` — writes to `PipeWriter` with per-chunk network flush. Enables segment-level streaming: each serializer chunk goes to the network immediately. `struct AsyncPipeWriterOutput : IBinaryOutputBase` — writes to `PipeWriter` with per-chunk network flush and **self-describing chunked framing**. Each chunk is framed as `[201][UINT16 size][data]` — zero-copy for both intermediate and final chunks.
### Differences from BufferWriterBinaryOutput ### Chunked Protocol Framing
Same cached chunk pattern (`GetMemory` → `TryGetArray` → direct array writes), but `Grow()` flushes the current chunk to the network before acquiring the next: Each chunk has a 3-byte header reserved via **header reservation** (skip 3 bytes in `AcquireChunk`, patch before `Advance`):
1. Wait for previous flush if still in-flight (`_lastFlush` backpressure) 1. `AcquireChunk`: request `chunkSize + 3` from PipeWriter, set `position = offset + 3` (skip reserved header), force `bufferEnd = offset + 3 + chunkSize`
2. `Advance(bytesInChunk)` — commit to `PipeWriter` 2. Context writes serializer data into `buffer[position..bufferEnd]`
3. `FlushAsync().Forget()` — fire-and-forget network send 3. `Grow()`: patch `[201][UINT16 dataBytes]` header, `Advance(3 + dataBytes)`, `FlushAsync().Forget()`
4. Acquire next chunk via `GetMemory` 4. `Flush()`: same as Grow — patch header, `Advance(3 + dataBytes)`. Zero-copy, no data copying. The protocol writes a single `[202]` byte after.
**First-Grow skip:** the first `Grow()` call does NOT flush — the length prefix span (reserved by the protocol before serialization) must stay valid until patching. `_firstGrow` flag controls this. ### Backpressure Modes
**Backpressure:** `_lastFlush` (ValueTask<FlushResult>) tracks the most recent flush. If the serializer produces chunks faster than the network consumes them, the next `Grow()` waits — max ~2 chunks in memory at any time. Constructor parameter `waitForFlush` (default `true`):
- **`waitForFlush=true`**: `Grow()` blocks if previous `FlushAsync` is still in-flight. Max ~2 chunks in memory.
- **`waitForFlush=false`**: `Grow()` never blocks. Data accumulates in PipeWriter's internal buffer and is sent with the next completed flush. Maximum serialization throughput.
In both modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls.
### Wire Format (per chunk)
```
CHUNK_DATA: [201][UINT16 size][data bytes] — every chunk (self-describing, variable size)
CHUNK_END: [202] — end signal (1 byte, no data)
```
Max chunk data size: 65535 bytes (UINT16 max).
### Usage ### Usage
Selected via `BinaryProtocolMode.AsyncSegment` in `AcBinaryHubProtocol`. The protocol casts `IBufferWriter<byte> output` to `PipeWriter` (safe — SignalR always provides `PipeWriter`). Selected via `BinaryProtocolMode.AsyncSegment` in `AcBinaryHubProtocol`. The protocol's `WriteMessageChunked` method sends CHUNK_START (standard SignalR framing), then calls the serializer which writes all chunks via `AsyncPipeWriterOutput`, then the protocol writes `[202]`.
```csharp ```csharp
AcBinarySerializer.Serialize(value, (PipeWriter)output, options) // AsyncPipeWriterOutput path AcBinarySerializer.Serialize(value, pipeWriter, options);
// All chunks already committed to PipeWriter. Protocol writes [202] and flushes.
``` ```
> Known issues and limitations: `BINARY_ISSUES.md`

View File

@ -1,6 +1,7 @@
using System.Buffers; using System.Buffers;
using System.Diagnostics; using System.Diagnostics;
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Text; using System.Text;
@ -8,6 +9,7 @@ using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs; namespace AyCode.Services.SignalRs;
@ -46,16 +48,42 @@ public class AcBinaryHubProtocol : IHubProtocol
private const byte MsgAck = 8; private const byte MsgAck = 8;
private const byte MsgSequence = 9; private const byte MsgSequence = 9;
// Chunked protocol framing for AsyncSegment mode
private const byte MsgAsyncChunkStart = 200;
private const byte MsgAsyncChunkData = 201;
private const byte MsgAsyncChunkEnd = 202;
/// <summary>Sentinel object placed in the args array for the streamed argument (replaced after chunk deserialization).</summary>
protected static readonly object StreamedArgPlaceholder = new();
protected volatile AcBinarySerializerOptions _options; protected volatile AcBinarySerializerOptions _options;
protected readonly BinaryProtocolMode _protocolMode; protected readonly BinaryProtocolMode _protocolMode;
protected readonly ILogger? _logger;
/// <summary>Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).</summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState>? _chunkStates;
private sealed class AsyncChunkState
{
public HubMessage PartialMessage = null!;
public object?[] Args = null!;
public int StreamedArgIndex;
public Type StreamedArgType = null!;
public Pipe InternalPipe = null!;
public Task<object?>? DeserTask;
}
public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null)
{ {
_options = options; _options = options;
_options.BufferWriterChunkSize = 4096; _options.BufferWriterChunkSize = 4096;
_protocolMode = protocolMode; _protocolMode = protocolMode;
_logger = logger;
_chunkStates = protocolMode == BinaryProtocolMode.AsyncSegment
? new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>()
: null;
} }
/// <summary> /// <summary>
@ -88,6 +116,15 @@ public class AcBinaryHubProtocol : IHubProtocol
public void WriteMessage(HubMessage message, IBufferWriter<byte> output) public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{ {
// AsyncSegment: chunked protocol framing for messages with streamable arguments
if (_protocolMode == BinaryProtocolMode.AsyncSegment
&& output is PipeWriter pipeWriter
&& HasStreamableArgs(message))
{
WriteMessageChunked(message, pipeWriter);
return;
}
// Reserve outer length prefix directly on the pipe (before BWO takes over) // Reserve outer length prefix directly on the pipe (before BWO takes over)
var lengthSpan = output.GetSpan(LengthPrefixSize); var lengthSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize); output.Advance(LengthPrefixSize);
@ -202,12 +239,159 @@ public class AcBinaryHubProtocol : IHubProtocol
#endregion #endregion
#region Chunked Protocol (AsyncSegment write)
/// <summary>
/// Returns true if the message has arguments that should be streamed via chunked protocol.
/// Only non-null, non-byte[] arguments go through the chunked path.
/// </summary>
private static bool HasStreamableArgs(HubMessage message) => message switch
{
InvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamItemMessage m => m.Item != null && m.Item is not byte[],
CompletionMessage m => m.HasResult && m.Result != null && m.Result is not byte[],
_ => false
};
private static bool HasNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return true;
}
return false;
}
/// <summary>
/// Gets the last non-null, non-byte[] argument value and its index for streaming.
/// </summary>
private static (object? value, int index) GetStreamedArg(HubMessage message) => message switch
{
InvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamItemMessage m => (m.Item, 0),
CompletionMessage m => (m.Result, 0),
_ => (null, -1)
};
private static (object? value, int index) GetLastNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return (args[i], i);
}
return (null, -1);
}
/// <summary>
/// Writes a message using chunked protocol framing for AsyncSegment mode.
/// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg.
/// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy).
/// CHUNK_END: [202] (1 byte, no data — all data already committed by output).
/// </summary>
private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter)
{
var (streamedArg, streamedArgIndex) = GetStreamedArg(message);
// --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) ---
{
var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize);
pipeWriter.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize);
int externalBytes = 0;
bw.WriteByte(MsgAsyncChunkStart);
// Write original message body with INT32 -1 for the streamed arg
switch (message)
{
case InvocationMessage m:
bw.WriteByte(MsgInvocation);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamInvocationMessage m:
bw.WriteByte(MsgStreamInvocation);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamItemMessage m:
bw.WriteByte(MsgStreamItem);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
case CompletionMessage m:
bw.WriteByte(MsgCompletion);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(1); // hasResult = true
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
}
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
}
pipeWriter.FlushAsync().GetAwaiter().GetResult();
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null)
AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
// --- CHUNK_END [202] ---
var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1);
pipeWriter.FlushAsync().GetAwaiter().GetResult();
}
/// <summary>
/// Writes arguments for CHUNK_START: all args normally except the streamed one (INT32 -1 marker).
/// </summary>
private void WriteArgumentsChunked(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output,
object?[] arguments, int streamedArgIndex, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
{
if (i == streamedArgIndex)
{
bw.WriteRaw(-1); // streamed arg placeholder
continue;
}
WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
}
#endregion
#region TryParseMessage #region TryParseMessage
public virtual bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) public virtual bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{ {
message = null; message = null;
// AsyncSegment chunk mode: non-standard framing (no INT32 length prefix)
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState))
return TryParseChunkData(ref input, chunkState, binder, out message);
// Normal path
var reader = new SequenceReader<byte>(input); var reader = new SequenceReader<byte>(input);
if (!reader.TryReadLittleEndian(out int payloadLength)) if (!reader.TryReadLittleEndian(out int payloadLength))
return false; return false;
@ -218,7 +402,17 @@ public class AcBinaryHubProtocol : IHubProtocol
message = ParseMessage(ref reader, payloadLength, binder); message = ParseMessage(ref reader, payloadLength, binder);
input = input.Slice(LengthPrefixSize + payloadLength); input = input.Slice(LengthPrefixSize + payloadLength);
return message != null; if (message != null)
return true;
// CHUNK_START consumed but no message yet — chunk mode just activated.
// Must try chunk data immediately; returning false here would cause SignalR
// to call AdvanceTo(examined=end) and wait for new data, even though
// CHUNK_DATA/CHUNK_END may already be in the remaining buffer.
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState))
return TryParseChunkData(ref input, chunkState, binder, out message);
return false;
} }
private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder) private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder)
@ -242,36 +436,39 @@ public class AcBinaryHubProtocol : IHubProtocol
MsgClose => ParseClose(ref r), MsgClose => ParseClose(ref r),
MsgAck => new AckMessage(ReadInt64(ref r)), MsgAck => new AckMessage(ReadInt64(ref r)),
MsgSequence => new SequenceMessage(ReadInt64(ref r)), MsgSequence => new SequenceMessage(ReadInt64(ref r)),
MsgAsyncChunkStart => ParseAsyncChunkStart(ref r, binder),
_ => null _ => null
}; };
} }
/// <summary> /// <summary>
/// Diagnostic logger for protocol-level debugging. /// Legacy diagnostic logger. Use ILogger via constructor instead.
/// Set to non-null to log target method, arg count, param types during ParseInvocation.
/// </summary> /// </summary>
[Obsolete("Use ILogger via constructor parameter instead. This property will be removed in a future version.")]
public static Action<string>? DiagnosticLogger { get; set; } public static Action<string>? DiagnosticLogger { get; set; }
[Conditional("DEBUG")] [Conditional("DEBUG")]
private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message); private void LogDiagnostic(string message) => _logger?.LogDebug(message);
[Conditional("DEBUG")] [Conditional("DEBUG")]
private static void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType) private void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType)
{ {
if (DiagnosticLogger == null) return; if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var segmentCount = 0; var segmentCount = 0;
foreach (var _ in argSlice) foreach (var _ in argSlice)
segmentCount++; segmentCount++;
DiagnosticLogger($"[AcBinaryHubProtocol] ReadSingleArgument: argLength={argLength}, isSingleSegment={argSlice.IsSingleSegment}, segments={segmentCount}, type={targetType.Name}"); _logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}",
argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name);
} }
[Conditional("DEBUG")] [Conditional("DEBUG")]
private static void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining) private void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining)
{ {
if (DiagnosticLogger == null) return; if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var typeNames = new string[paramTypes.Count]; var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}"); _logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}",
target, paramTypes.Count, string.Join(", ", typeNames), remaining);
} }
private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder) private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
@ -378,6 +575,217 @@ public class AcBinaryHubProtocol : IHubProtocol
#endregion #endregion
#region Chunked Protocol (AsyncSegment read)
/// <summary>
/// Processes CHUNK_DATA and CHUNK_END in chunk accumulation mode.
/// Called from TryParseMessage when an active AsyncChunkState exists for this connection.
/// Loops over all available chunks — critical because SignalR's while loop exits when
/// TryParseMessage returns false, and won't re-enter until new data arrives on the pipe.
/// </summary>
private bool TryParseChunkData(ref ReadOnlySequence<byte> input, AsyncChunkState state,
IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
while (input.Length >= 1)
{
var firstByte = input.FirstSpan[0];
if (firstByte == MsgAsyncChunkData) // 201 — self-describing data chunk [201][UINT16 size][data]
{
// Need at least [201][UINT16]
if (input.Length < 3) return false;
// Read UINT16 chunk data size
var headerSlice = input.Slice(1, 2);
Span<byte> sizeBytes = stackalloc byte[2];
headerSlice.CopyTo(sizeBytes);
var chunkDataSize = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(sizeBytes);
var totalNeeded = 3 + chunkDataSize; // header (3) + data
if (input.Length < totalNeeded) return false;
// Write chunk data to internal pipe for background deserialization
if (chunkDataSize > 0)
{
var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice)
state.InternalPipe.Writer.Write(segment.Span);
state.InternalPipe.Writer.FlushAsync().GetAwaiter().GetResult();
}
// Lazy start: begin background deserialization after first chunk is in the pipe.
// Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available.
if (state.DeserTask == null)
{
var pipeReader = state.InternalPipe.Reader;
var type = state.StreamedArgType;
var opts = _options;
state.DeserTask = Task.Run(() =>
(object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts));
}
input = input.Slice(totalNeeded);
continue; // try next chunk immediately
}
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{
// Signal end of data → background deser task completes
state.InternalPipe.Writer.Complete();
object? deserializedArg = null;
if (state.DeserTask != null)
{
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
state.InternalPipe.Reader.Complete();
}
// Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg);
_chunkStates!.Remove(binder);
input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage;
return true;
}
// Unknown byte in chunk mode — break out (shouldn't happen)
break;
}
return false;
}
/// <summary>
/// Parses CHUNK_START: reads original message (with -1 marker for streamed arg),
/// creates internal Pipe, starts background deserialization task, stores state.
/// Returns null to signal "consumed bytes, no complete message yet".
/// </summary>
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
{
r.TryRead(out byte originalMsgType);
// Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments
var partialMessage = originalMsgType switch
{
MsgInvocation => ParseInvocation(ref r, binder),
MsgStreamInvocation => ParseStreamInvocation(ref r, binder),
MsgStreamItem => ParseStreamItem(ref r, binder),
MsgCompletion => ParseCompletion(ref r, binder),
_ => null
};
if (partialMessage == null) return null;
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
var state = new AsyncChunkState
{
PartialMessage = partialMessage,
Args = args,
StreamedArgIndex = streamedIndex,
StreamedArgType = streamedType,
InternalPipe = new Pipe()
// DeserTask started lazily in TryParseChunkData after first chunk is written
};
_chunkStates!.AddOrUpdate(binder, state);
return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData
}
/// <summary>
/// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array,
/// placeholder index, and the target deserialization type.
/// </summary>
private static (object?[] args, int index, Type type) FindStreamedArgSlot(
HubMessage message, IInvocationBinder binder)
{
switch (message)
{
case InvocationMessage inv:
{
var paramTypes = binder.GetParameterTypes(inv.Target);
for (var i = 0; i < inv.Arguments.Length; i++)
{
if (ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder))
{
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (inv.Arguments, i, type);
}
}
break;
}
case StreamInvocationMessage sinv:
{
var paramTypes = binder.GetParameterTypes(sinv.Target);
for (var i = 0; i < sinv.Arguments.Length; i++)
{
if (ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder))
{
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (sinv.Arguments, i, type);
}
}
break;
}
case StreamItemMessage si:
{
if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
{
// StreamItemMessage.Item is read-only, use a wrapper array
var args = new object?[] { si.Item };
var type = binder.GetStreamItemType(si.InvocationId!);
return (args, 0, type);
}
break;
}
case CompletionMessage comp:
{
if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
{
var args = new object?[] { comp.Result };
var type = binder.GetReturnType(comp.InvocationId!);
return (args, 0, type);
}
break;
}
}
return (Array.Empty<object?>(), -1, typeof(object));
}
/// <summary>
/// Replaces the StreamedArgPlaceholder with the deserialized value in the stored message.
/// </summary>
private static void FillStreamedArg(AsyncChunkState state, object? deserializedValue)
{
if (state.StreamedArgIndex < 0) return;
switch (state.PartialMessage)
{
case InvocationMessage inv:
inv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamInvocationMessage sinv:
sinv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamItemMessage:
// StreamItemMessage.Item has no public setter — need to create a new message
if (state.PartialMessage is StreamItemMessage si)
state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue);
break;
case CompletionMessage:
// CompletionMessage.Result has no public setter — need to create a new message
if (state.PartialMessage is CompletionMessage comp)
state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue);
break;
}
}
#endregion
#region Argument Serialization #region Argument Serialization
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes) private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
@ -421,16 +829,15 @@ public class AcBinaryHubProtocol : IHubProtocol
return; return;
} }
// Segment / AsyncSegment: serialize directly to the pipe // Segment mode: serialize directly to the pipe via BufferWriterBinaryOutput
// (AsyncSegment goes through WriteMessageChunked, never reaches here)
bw.FlushAndReset(); bw.FlushAndReset();
// Reserve arg length prefix directly on the pipe // Reserve arg length prefix directly on the pipe
var argLenSpan = output.GetSpan(LengthPrefixSize); var argLenSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize); output.Advance(LengthPrefixSize);
var argBytes = _protocolMode == BinaryProtocolMode.AsyncSegment var argBytes = AcBinarySerializer.Serialize(value, output, _options);
? AcBinarySerializer.Serialize(value, (System.IO.Pipelines.PipeWriter)output, _options)
: AcBinarySerializer.Serialize(value, output, _options);
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
externalBytes += LengthPrefixSize + argBytes; externalBytes += LengthPrefixSize + argBytes;
@ -470,6 +877,10 @@ public class AcBinaryHubProtocol : IHubProtocol
if (argLength == 0) if (argLength == 0)
return null; return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1)
return StreamedArgPlaceholder;
// Null marker check // Null marker check
if (argLength == 1) if (argLength == 1)
{ {

View File

@ -54,7 +54,7 @@ namespace AyCode.Services.SignalRs
.ConfigureLogging(logging => .ConfigureLogging(logging =>
{ {
// alap minimális MS log level // alap minimális MS log level
logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Warning); logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Debug);
// regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt // regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt
logging.AddAcLogger(_ => Logger); logging.AddAcLogger(_ => Logger);
@ -75,7 +75,10 @@ namespace AyCode.Services.SignalRs
var binaryOptions = AcBinarySerializerOptions.Default; var binaryOptions = AcBinarySerializerOptions.Default;
binaryOptions.BufferWriterChunkSize = 4096; binaryOptions.BufferWriterChunkSize = 4096;
return new AyCodeBinaryHubProtocol(binaryOptions, BinaryProtocolMode.AsyncSegment); // AcSignalRClientBase — a 84. sor környékén:
var signalLogger = sp.GetRequiredService<ILoggerFactory>().CreateLogger<AyCodeBinaryHubProtocol>();
return new AyCodeBinaryHubProtocol(binaryOptions, BinaryProtocolMode.AsyncSegment, signalLogger);
// és törölhető: AcBinaryHubProtocol.DiagnosticLogger = msg => Logger.Debug(msg);
}); });
//Vagy ha az options-t is DI-ből: //Vagy ha az options-t is DI-ből:

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Buffers; using System.Buffers;
using AyCode.Core.Serializers.Binaries; using AyCode.Core.Serializers.Binaries;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs; namespace AyCode.Services.SignalRs;
@ -18,7 +19,7 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
private SignalParams? _currentSignalParams; private SignalParams? _currentSignalParams;
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) : base(options, protocolMode) { } public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
protected override void OnArgumentRead(object? value, int index) protected override void OnArgumentRead(object? value, int index)
{ {
@ -32,6 +33,10 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
if (argLength == 0) if (argLength == 0)
return null; return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1)
return StreamedArgPlaceholder;
if (argLength == 1) if (argLength == 1)
{ {
r.TryPeek(out byte marker); r.TryPeek(out byte marker);

View File

@ -16,10 +16,12 @@ namespace AyCode.Services.SignalRs;
/// </para> /// </para>
/// <para> /// <para>
/// <b>AsyncSegment</b>: Serialize via <c>AsyncPipeWriterOutput</c> directly to the <c>PipeWriter</c>, /// <b>AsyncSegment</b>: Serialize via <c>AsyncPipeWriterOutput</c> directly to the <c>PipeWriter</c>,
/// per-chunk <c>FlushAsync</c> sends data to the network during serialization. Deserialize via /// per-chunk <c>FlushAsync</c> sends data to the network during serialization using self-describing
/// <c>PipeReaderBinaryInput</c> with on-demand <c>ReadAsync</c> (processes chunks as they arrive). /// chunked framing: each chunk is <c>[201][UINT16 size][data]</c>, end signal is <c>[202]</c>.
/// Zerocopy write + pipeline parallelism (ser/network/deser overlap), highest roundtrip potential /// Deserialize via <c>PipeReaderBinaryInput</c> from internal <c>Pipe</c> with on-demand <c>ReadAsync</c>
/// for large payloads. /// (background Task processes chunks as they arrive). Zerocopy write + pipeline parallelism
/// (ser/network/deser overlap), highest roundtrip potential for large payloads.
/// Max chunk data size: 65535 bytes (UINT16).
/// </para> /// </para>
/// </summary> /// </summary>
public enum BinaryProtocolMode public enum BinaryProtocolMode
@ -37,8 +39,11 @@ public enum BinaryProtocolMode
Segment = 1, Segment = 1,
/// <summary> /// <summary>
/// AsyncPipeWriterOutput → PipeWriter, per-chunk FlushAsync. Deser: PipeReaderBinaryInput (on-demand ReadAsync). /// AsyncPipeWriterOutput → PipeWriter, per-chunk FlushAsync with self-describing chunked framing.
/// Each chunk (including the last) is sent as <c>[201][UINT16 size][data]</c>; end signal is <c>[202]</c>.
/// Deser: PipeReaderBinaryInput from internal Pipe (on-demand ReadAsync, background Task).
/// Zerocopy write + pipeline parallelism (ser/network/deser overlap). /// Zerocopy write + pipeline parallelism (ser/network/deser overlap).
/// Max chunk data size: 65535 bytes (UINT16).
/// </summary> /// </summary>
AsyncSegment = 2, AsyncSegment = 2,
} }

File diff suppressed because one or more lines are too long