diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 5221121..c5b2fbe 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -423,24 +423,28 @@ public static partial class AcBinarySerializer } /// - /// Serialize to PipeWriter with segment streaming (flush per chunk via AsyncPipeWriterOutput). - /// Each chunk is flushed to the network as it fills, enabling pipeline parallelism. - /// Returns total bytes written. + /// Serialize to PipeWriter with chunked protocol framing via AsyncPipeWriterOutput. + /// Each chunk (including the last) is framed as [201][UINT16 size][data] and committed + /// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single [202] + /// byte after to signal end-of-stream. /// - public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + /// Total serialized data bytes (excluding framing overhead). + public static int Serialize( + T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, + bool waitForFlush = true) { if (value == null) { + // Null: write directly, no chunking needed var span = pipeWriter.GetSpan(1); span[0] = BinaryTypeCode.Null; pipeWriter.Advance(1); - pipeWriter.FlushAsync().GetAwaiter().GetResult(); return 1; } var runtimeType = value.GetType(); var context = BinarySerializationContextPool.Get(options); - context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize); + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 43bc563..8b953a9 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -1,5 +1,6 @@ using System; using System.Buffers; +using System.Buffers.Binary; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -9,114 +10,115 @@ using AyCode.Core.Helpers; namespace AyCode.Core.Serializers.Binaries; /// -/// Binary output that writes to a PipeWriter with per-chunk network flush. +/// Binary output that writes to a PipeWriter with per-chunk network flush and self-describing framing. /// -/// Identical to BufferWriterBinaryOutput except: Grow() calls PipeWriter.FlushAsync().Forget() -/// after committing each chunk, so data flows to the network as it's being serialized -/// rather than waiting for the full serialization to complete. +/// Each chunk (including the last) is framed as [201][UINT16 size][data] with a 3-byte header +/// reserved at the start of each buffer. The serializer context writes into the space after the +/// reserved bytes; on Grow(), the header is patched and the full chunk is committed via Advance +/// and flushed to the network. Flush() does the same for the last (partial) chunk — zero-copy +/// for both intermediate and final chunks. /// -/// Backpressure: stores the last FlushAsync ValueTask. If the previous flush hasn't completed -/// by the next Grow(), blocks until it does. This bounds memory to ~2 chunks. +/// The protocol layer writes a single [202] byte after all chunks to signal end-of-stream. /// -/// The first Grow() skips the flush to keep the length prefix span valid for patching. +/// Backpressure modes (controlled by waitForFlush constructor parameter): +/// +/// waitForFlush=true (default): Grow() blocks if the previous FlushAsync hasn't completed. +/// Bounds memory to ~2 chunks in flight. +/// waitForFlush=false: Grow() never blocks. Data accumulates in the PipeWriter's internal +/// buffer and is sent with the next completed flush. Maximum serialization throughput. +/// +/// +/// Maximum chunk data size: 65535 bytes (UINT16 max). /// public struct AsyncPipeWriterOutput : IBinaryOutputBase { + /// MsgAsyncChunkData type marker (201). + private const byte ChunkDataMarker = 201; + + /// Header size: 1 byte type + 2 bytes UINT16 size. + private const int HeaderSize = 3; + + /// Maximum chunk data size (UINT16 max). + public const int MaxChunkSize = ushort.MaxValue; + private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; + private readonly bool _waitForFlush; private int _committedBytes; private int _currentChunkStart; private bool _ownedBuffer; private ValueTask _lastFlush; - private bool _firstGrow; - public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096) + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true) { + if (chunkSize > MaxChunkSize) + throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, + $"Chunk size cannot exceed {MaxChunkSize} (UINT16 max)."); + _pipeWriter = pipeWriter; _chunkSize = chunkSize; + _waitForFlush = waitForFlush; _committedBytes = 0; _ownedBuffer = false; _lastFlush = default; - _firstGrow = true; } /// - /// Provides the initial buffer from the PipeWriter. + /// Provides the initial buffer from the PipeWriter with 3-byte header reservation. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; _lastFlush = default; - _firstGrow = true; AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); _currentChunkStart = position; } /// - /// Called when the context's buffer is full. Commits current chunk to the PipeWriter, - /// fires a background flush (except on the first call — length prefix must stay valid), - /// and acquires a new chunk. + /// Called when the context's buffer is full. Patches the chunk header [201][UINT16 size], + /// commits the chunk to the PipeWriter, and fires a background flush. /// [MethodImpl(MethodImplOptions.NoInlining)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) { // Backpressure: wait for previous flush if still in progress - if (!_lastFlush.IsCompleted) + if (_waitForFlush && !_lastFlush.IsCompleted) _lastFlush.GetAwaiter().GetResult(); - // Commit bytes written in current chunk - var bytesInChunk = position - _currentChunkStart; - if (bytesInChunk > 0) - { - if (_ownedBuffer) - FlushOwnedBuffer(buffer, bytesInChunk); - else - _pipeWriter.Advance(bytesInChunk); - _committedBytes += bytesInChunk; - } + CommitCurrentChunk(buffer, position); - // Fire-and-forget flush — EXCEPT first chunk (length prefix span must stay valid) - if (!_firstGrow) + // Fire-and-forget flush when previous is done + if (_lastFlush.IsCompleted) { _lastFlush = _pipeWriter.FlushAsync(); _lastFlush.Forget(); } - _firstGrow = false; - // Acquire new chunk + // Acquire new chunk with header reservation AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd); _currentChunkStart = position; } /// - /// Returns total bytes written: committed + pending in current chunk. + /// Returns total serialized data bytes (excluding framing overhead). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetTotalPosition(int currentPosition) => _committedBytes + (currentPosition - _currentChunkStart); /// - /// Commits final pending bytes and performs a synchronous flush. - /// Must be called after all writes are complete. + /// Commits the last (partial) chunk to the PipeWriter with [201][UINT16 size] header. + /// Zero-copy: patches the reserved header bytes and calls Advance — no data copying. + /// Does NOT flush to network — the protocol writes [202] and flushes after. /// public void Flush(byte[] buffer, int position) { - // Wait for any in-flight flush + // Wait for any in-flight flush from previous Grow if (!_lastFlush.IsCompleted) _lastFlush.GetAwaiter().GetResult(); - var bytesInChunk = position - _currentChunkStart; - if (bytesInChunk > 0) - { - if (_ownedBuffer) - FlushOwnedBuffer(buffer, bytesInChunk); - else - _pipeWriter.Advance(bytesInChunk); - } - - // Final synchronous flush — ensures all data reaches the network - _pipeWriter.FlushAsync().GetAwaiter().GetResult(); + CommitCurrentChunk(buffer, position); } /// @@ -124,35 +126,57 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// public void Reset() { } - [MethodImpl(MethodImplOptions.NoInlining)] - private void FlushOwnedBuffer(byte[] buffer, int bytesInChunk) + /// + /// Patches [201][UINT16 dataBytes] into the reserved header and commits via Advance. + /// For owned buffers, copies to PipeWriter first. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CommitCurrentChunk(byte[] buffer, int position) { - var span = _pipeWriter.GetSpan(bytesInChunk); - buffer.AsSpan(_currentChunkStart, bytesInChunk).CopyTo(span); - _pipeWriter.Advance(bytesInChunk); + var dataBytes = position - _currentChunkStart; + if (dataBytes <= 0) return; + + var headerStart = _currentChunkStart - HeaderSize; + buffer[headerStart] = ChunkDataMarker; + BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); + + if (_ownedBuffer) + FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + else + _pipeWriter.Advance(HeaderSize + dataBytes); + + _committedBytes += dataBytes; // only count data bytes, not framing + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void FlushOwnedBuffer(byte[] buffer, int start, int length) + { + var span = _pipeWriter.GetSpan(length); + buffer.AsSpan(start, length).CopyTo(span); + _pipeWriter.Advance(length); ArrayPool.Shared.Return(buffer); _ownedBuffer = false; } private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) { - var actualRequest = Math.Max(requestSize, _chunkSize); - var memory = _pipeWriter.GetMemory(actualRequest); + var dataSize = Math.Min(Math.Max(requestSize, _chunkSize), MaxChunkSize); + var totalRequest = dataSize + HeaderSize; + var memory = _pipeWriter.GetMemory(totalRequest); if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) { buffer = segment.Array; - position = segment.Offset; - bufferEnd = segment.Offset + segment.Count; + position = segment.Offset + HeaderSize; + bufferEnd = segment.Offset + HeaderSize + dataSize; _ownedBuffer = false; } else { - // Fallback for non-array-backed PipeWriter (e.g. Kestrel PinnedBlockMemoryPool) - var owned = ArrayPool.Shared.Rent(actualRequest); + var owned = ArrayPool.Shared.Rent(totalRequest); buffer = owned; - position = 0; - bufferEnd = owned.Length; + position = HeaderSize; + bufferEnd = HeaderSize + dataSize; _ownedBuffer = true; } } diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs index 33a51dc..86f158c 100644 --- a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs +++ b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs @@ -60,6 +60,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) throw new AcBinaryDeserializationException("Empty pipe — no data to read."); + _consumedUpTo = _nextSegmentPosition; // Mark first segment as consumed (same as TryLoadNextSegmentFromBuffer) ExtractArray(memory, out buffer, out position, out bufferLength); } diff --git a/AyCode.Core/docs/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY_WRITERS.md index 5264672..6f83ef4 100644 --- a/AyCode.Core/docs/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY_WRITERS.md @@ -103,27 +103,40 @@ void Release(); ## AsyncPipeWriterOutput -`struct AsyncPipeWriterOutput : IBinaryOutputBase` — writes to `PipeWriter` with per-chunk network flush. Enables segment-level streaming: each serializer chunk goes to the network immediately. +`struct AsyncPipeWriterOutput : IBinaryOutputBase` — writes to `PipeWriter` with per-chunk network flush and **self-describing chunked framing**. Each chunk is framed as `[201][UINT16 size][data]` — zero-copy for both intermediate and final chunks. -### Differences from BufferWriterBinaryOutput +### Chunked Protocol Framing -Same cached chunk pattern (`GetMemory` → `TryGetArray` → direct array writes), but `Grow()` flushes the current chunk to the network before acquiring the next: +Each chunk has a 3-byte header reserved via **header reservation** (skip 3 bytes in `AcquireChunk`, patch before `Advance`): -1. Wait for previous flush if still in-flight (`_lastFlush` backpressure) -2. `Advance(bytesInChunk)` — commit to `PipeWriter` -3. `FlushAsync().Forget()` — fire-and-forget network send -4. Acquire next chunk via `GetMemory` +1. `AcquireChunk`: request `chunkSize + 3` from PipeWriter, set `position = offset + 3` (skip reserved header), force `bufferEnd = offset + 3 + chunkSize` +2. Context writes serializer data into `buffer[position..bufferEnd]` +3. `Grow()`: patch `[201][UINT16 dataBytes]` header, `Advance(3 + dataBytes)`, `FlushAsync().Forget()` +4. `Flush()`: same as Grow — patch header, `Advance(3 + dataBytes)`. Zero-copy, no data copying. The protocol writes a single `[202]` byte after. -**First-Grow skip:** the first `Grow()` call does NOT flush — the length prefix span (reserved by the protocol before serialization) must stay valid until patching. `_firstGrow` flag controls this. +### Backpressure Modes -**Backpressure:** `_lastFlush` (ValueTask) tracks the most recent flush. If the serializer produces chunks faster than the network consumes them, the next `Grow()` waits — max ~2 chunks in memory at any time. +Constructor parameter `waitForFlush` (default `true`): + +- **`waitForFlush=true`**: `Grow()` blocks if previous `FlushAsync` is still in-flight. Max ~2 chunks in memory. +- **`waitForFlush=false`**: `Grow()` never blocks. Data accumulates in PipeWriter's internal buffer and is sent with the next completed flush. Maximum serialization throughput. + +In both modes, flush is only initiated when `_lastFlush.IsCompleted` — no overlapping FlushAsync calls. + +### Wire Format (per chunk) + +``` +CHUNK_DATA: [201][UINT16 size][data bytes] — every chunk (self-describing, variable size) +CHUNK_END: [202] — end signal (1 byte, no data) +``` + +Max chunk data size: 65535 bytes (UINT16 max). ### Usage -Selected via `BinaryProtocolMode.AsyncSegment` in `AcBinaryHubProtocol`. The protocol casts `IBufferWriter output` to `PipeWriter` (safe — SignalR always provides `PipeWriter`). +Selected via `BinaryProtocolMode.AsyncSegment` in `AcBinaryHubProtocol`. The protocol's `WriteMessageChunked` method sends CHUNK_START (standard SignalR framing), then calls the serializer which writes all chunks via `AsyncPipeWriterOutput`, then the protocol writes `[202]`. ```csharp -AcBinarySerializer.Serialize(value, (PipeWriter)output, options) // AsyncPipeWriterOutput path +AcBinarySerializer.Serialize(value, pipeWriter, options); +// All chunks already committed to PipeWriter. Protocol writes [202] and flushes. ``` - -> Known issues and limitations: `BINARY_ISSUES.md` diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index fee8746..944924b 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text; @@ -8,6 +9,7 @@ using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Protocol; +using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs; @@ -46,16 +48,42 @@ public class AcBinaryHubProtocol : IHubProtocol private const byte MsgAck = 8; private const byte MsgSequence = 9; + // Chunked protocol framing for AsyncSegment mode + private const byte MsgAsyncChunkStart = 200; + private const byte MsgAsyncChunkData = 201; + private const byte MsgAsyncChunkEnd = 202; + + /// Sentinel object placed in the args array for the streamed argument (replaced after chunk deserialization). + protected static readonly object StreamedArgPlaceholder = new(); + protected volatile AcBinarySerializerOptions _options; protected readonly BinaryProtocolMode _protocolMode; + protected readonly ILogger? _logger; + + /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly). + private readonly ConditionalWeakTable? _chunkStates; + + private sealed class AsyncChunkState + { + public HubMessage PartialMessage = null!; + public object?[] Args = null!; + public int StreamedArgIndex; + public Type StreamedArgType = null!; + public Pipe InternalPipe = null!; + public Task? DeserTask; + } public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } - public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) + public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) { _options = options; _options.BufferWriterChunkSize = 4096; _protocolMode = protocolMode; + _logger = logger; + _chunkStates = protocolMode == BinaryProtocolMode.AsyncSegment + ? new ConditionalWeakTable() + : null; } /// @@ -88,6 +116,15 @@ public class AcBinaryHubProtocol : IHubProtocol public void WriteMessage(HubMessage message, IBufferWriter output) { + // AsyncSegment: chunked protocol framing for messages with streamable arguments + if (_protocolMode == BinaryProtocolMode.AsyncSegment + && output is PipeWriter pipeWriter + && HasStreamableArgs(message)) + { + WriteMessageChunked(message, pipeWriter); + return; + } + // Reserve outer length prefix directly on the pipe (before BWO takes over) var lengthSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); @@ -202,12 +239,159 @@ public class AcBinaryHubProtocol : IHubProtocol #endregion + #region Chunked Protocol (AsyncSegment write) + + /// + /// Returns true if the message has arguments that should be streamed via chunked protocol. + /// Only non-null, non-byte[] arguments go through the chunked path. + /// + private static bool HasStreamableArgs(HubMessage message) => message switch + { + InvocationMessage m => HasNonByteArrayArg(m.Arguments), + StreamInvocationMessage m => HasNonByteArrayArg(m.Arguments), + StreamItemMessage m => m.Item != null && m.Item is not byte[], + CompletionMessage m => m.HasResult && m.Result != null && m.Result is not byte[], + _ => false + }; + + private static bool HasNonByteArrayArg(object?[] args) + { + for (var i = args.Length - 1; i >= 0; i--) + { + if (args[i] != null && args[i] is not byte[]) + return true; + } + return false; + } + + /// + /// Gets the last non-null, non-byte[] argument value and its index for streaming. + /// + private static (object? value, int index) GetStreamedArg(HubMessage message) => message switch + { + InvocationMessage m => GetLastNonByteArrayArg(m.Arguments), + StreamInvocationMessage m => GetLastNonByteArrayArg(m.Arguments), + StreamItemMessage m => (m.Item, 0), + CompletionMessage m => (m.Result, 0), + _ => (null, -1) + }; + + private static (object? value, int index) GetLastNonByteArrayArg(object?[] args) + { + for (var i = args.Length - 1; i >= 0; i--) + { + if (args[i] != null && args[i] is not byte[]) + return (args[i], i); + } + return (null, -1); + } + + /// + /// Writes a message using chunked protocol framing for AsyncSegment mode. + /// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg. + /// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy). + /// CHUNK_END: [202] (1 byte, no data — all data already committed by output). + /// + private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter) + { + var (streamedArg, streamedArgIndex) = GetStreamedArg(message); + + // --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) --- + { + var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize); + pipeWriter.Advance(LengthPrefixSize); + + var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize); + int externalBytes = 0; + + bw.WriteByte(MsgAsyncChunkStart); + + // Write original message body with INT32 -1 for the streamed arg + switch (message) + { + case InvocationMessage m: + bw.WriteByte(MsgInvocation); + WriteNullableString(ref bw, m.InvocationId); + bw.WriteStringUtf8(m.Target); + WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); + WriteStringArray(ref bw, m.StreamIds); + WriteHeaders(ref bw, m.Headers); + break; + + case StreamInvocationMessage m: + bw.WriteByte(MsgStreamInvocation); + bw.WriteStringUtf8(m.InvocationId!); + bw.WriteStringUtf8(m.Target); + WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); + WriteStringArray(ref bw, m.StreamIds); + WriteHeaders(ref bw, m.Headers); + break; + + case StreamItemMessage m: + bw.WriteByte(MsgStreamItem); + bw.WriteStringUtf8(m.InvocationId!); + bw.WriteRaw(-1); // streamed arg marker + WriteHeaders(ref bw, m.Headers); + break; + + case CompletionMessage m: + bw.WriteByte(MsgCompletion); + bw.WriteStringUtf8(m.InvocationId!); + WriteNullableString(ref bw, m.Error); + bw.WriteByte(1); // hasResult = true + bw.WriteRaw(-1); // streamed arg marker + WriteHeaders(ref bw, m.Headers); + break; + } + + var totalPayload = bw.Position + externalBytes; + bw.Flush(); + Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); + } + pipeWriter.FlushAsync().GetAwaiter().GetResult(); + + // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- + if (streamedArg != null) + AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); + + // --- CHUNK_END [202] --- + var endByte = pipeWriter.GetSpan(1); + endByte[0] = MsgAsyncChunkEnd; + pipeWriter.Advance(1); + pipeWriter.FlushAsync().GetAwaiter().GetResult(); + } + + /// + /// Writes arguments for CHUNK_START: all args normally except the streamed one (INT32 -1 marker). + /// + private void WriteArgumentsChunked(ref BufferWriterBinaryOutput bw, IBufferWriter output, + object?[] arguments, int streamedArgIndex, ref int externalBytes) + { + bw.WriteVarUInt((uint)arguments.Length); + for (var i = 0; i < arguments.Length; i++) + { + if (i == streamedArgIndex) + { + bw.WriteRaw(-1); // streamed arg placeholder + continue; + } + WriteArgument(ref bw, output, arguments[i], ref externalBytes); + } + } + + #endregion + #region TryParseMessage public virtual bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) { message = null; + // AsyncSegment chunk mode: non-standard framing (no INT32 length prefix) + if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState)) + return TryParseChunkData(ref input, chunkState, binder, out message); + + // Normal path var reader = new SequenceReader(input); if (!reader.TryReadLittleEndian(out int payloadLength)) return false; @@ -218,7 +402,17 @@ public class AcBinaryHubProtocol : IHubProtocol message = ParseMessage(ref reader, payloadLength, binder); input = input.Slice(LengthPrefixSize + payloadLength); - return message != null; + if (message != null) + return true; + + // CHUNK_START consumed but no message yet — chunk mode just activated. + // Must try chunk data immediately; returning false here would cause SignalR + // to call AdvanceTo(examined=end) and wait for new data, even though + // CHUNK_DATA/CHUNK_END may already be in the remaining buffer. + if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState)) + return TryParseChunkData(ref input, chunkState, binder, out message); + + return false; } private HubMessage? ParseMessage(ref SequenceReader r, int payloadLength, IInvocationBinder binder) @@ -242,36 +436,39 @@ public class AcBinaryHubProtocol : IHubProtocol MsgClose => ParseClose(ref r), MsgAck => new AckMessage(ReadInt64(ref r)), MsgSequence => new SequenceMessage(ReadInt64(ref r)), + MsgAsyncChunkStart => ParseAsyncChunkStart(ref r, binder), _ => null }; } /// - /// Diagnostic logger for protocol-level debugging. - /// Set to non-null to log target method, arg count, param types during ParseInvocation. + /// Legacy diagnostic logger. Use ILogger via constructor instead. /// + [Obsolete("Use ILogger via constructor parameter instead. This property will be removed in a future version.")] public static Action? DiagnosticLogger { get; set; } [Conditional("DEBUG")] - private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message); + private void LogDiagnostic(string message) => _logger?.LogDebug(message); [Conditional("DEBUG")] - private static void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType) + private void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType) { - if (DiagnosticLogger == null) return; + if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; var segmentCount = 0; foreach (var _ in argSlice) segmentCount++; - DiagnosticLogger($"[AcBinaryHubProtocol] ReadSingleArgument: argLength={argLength}, isSingleSegment={argSlice.IsSingleSegment}, segments={segmentCount}, type={targetType.Name}"); + _logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}", + argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name); } [Conditional("DEBUG")] - private static void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) + private void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) { - if (DiagnosticLogger == null) return; + if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; var typeNames = new string[paramTypes.Count]; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; - DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}"); + _logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}", + target, paramTypes.Count, string.Join(", ", typeNames), remaining); } private HubMessage ParseInvocation(ref SequenceReader r, IInvocationBinder binder) @@ -378,6 +575,217 @@ public class AcBinaryHubProtocol : IHubProtocol #endregion + #region Chunked Protocol (AsyncSegment read) + + /// + /// Processes CHUNK_DATA and CHUNK_END in chunk accumulation mode. + /// Called from TryParseMessage when an active AsyncChunkState exists for this connection. + /// Loops over all available chunks — critical because SignalR's while loop exits when + /// TryParseMessage returns false, and won't re-enter until new data arrives on the pipe. + /// + private bool TryParseChunkData(ref ReadOnlySequence input, AsyncChunkState state, + IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) + { + message = null; + + while (input.Length >= 1) + { + var firstByte = input.FirstSpan[0]; + + if (firstByte == MsgAsyncChunkData) // 201 — self-describing data chunk [201][UINT16 size][data] + { + // Need at least [201][UINT16] + if (input.Length < 3) return false; + + // Read UINT16 chunk data size + var headerSlice = input.Slice(1, 2); + Span sizeBytes = stackalloc byte[2]; + headerSlice.CopyTo(sizeBytes); + var chunkDataSize = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(sizeBytes); + + var totalNeeded = 3 + chunkDataSize; // header (3) + data + if (input.Length < totalNeeded) return false; + + // Write chunk data to internal pipe for background deserialization + if (chunkDataSize > 0) + { + var dataSlice = input.Slice(3, chunkDataSize); + foreach (var segment in dataSlice) + state.InternalPipe.Writer.Write(segment.Span); + state.InternalPipe.Writer.FlushAsync().GetAwaiter().GetResult(); + } + + // Lazy start: begin background deserialization after first chunk is in the pipe. + // Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available. + if (state.DeserTask == null) + { + var pipeReader = state.InternalPipe.Reader; + var type = state.StreamedArgType; + var opts = _options; + state.DeserTask = Task.Run(() => + (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); + } + + input = input.Slice(totalNeeded); + continue; // try next chunk immediately + } + + if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) + { + // Signal end of data → background deser task completes + state.InternalPipe.Writer.Complete(); + object? deserializedArg = null; + if (state.DeserTask != null) + { + deserializedArg = state.DeserTask.GetAwaiter().GetResult(); + state.InternalPipe.Reader.Complete(); + } + + // Fill the placeholder in the stored message's args + FillStreamedArg(state, deserializedArg); + + _chunkStates!.Remove(binder); + input = input.Slice(1); // consume the single [202] byte + message = state.PartialMessage; + return true; + } + + // Unknown byte in chunk mode — break out (shouldn't happen) + break; + } + + return false; + } + + /// + /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), + /// creates internal Pipe, starts background deserialization task, stores state. + /// Returns null to signal "consumed bytes, no complete message yet". + /// + private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) + { + r.TryRead(out byte originalMsgType); + + // Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments + var partialMessage = originalMsgType switch + { + MsgInvocation => ParseInvocation(ref r, binder), + MsgStreamInvocation => ParseStreamInvocation(ref r, binder), + MsgStreamItem => ParseStreamItem(ref r, binder), + MsgCompletion => ParseCompletion(ref r, binder), + _ => null + }; + + if (partialMessage == null) return null; + + // Find the placeholder arg and its target type + var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder); + + var state = new AsyncChunkState + { + PartialMessage = partialMessage, + Args = args, + StreamedArgIndex = streamedIndex, + StreamedArgType = streamedType, + InternalPipe = new Pipe() + // DeserTask started lazily in TryParseChunkData after first chunk is written + }; + + _chunkStates!.AddOrUpdate(binder, state); + return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData + } + + /// + /// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array, + /// placeholder index, and the target deserialization type. + /// + private static (object?[] args, int index, Type type) FindStreamedArgSlot( + HubMessage message, IInvocationBinder binder) + { + switch (message) + { + case InvocationMessage inv: + { + var paramTypes = binder.GetParameterTypes(inv.Target); + for (var i = 0; i < inv.Arguments.Length; i++) + { + if (ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) + { + var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); + return (inv.Arguments, i, type); + } + } + break; + } + case StreamInvocationMessage sinv: + { + var paramTypes = binder.GetParameterTypes(sinv.Target); + for (var i = 0; i < sinv.Arguments.Length; i++) + { + if (ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) + { + var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); + return (sinv.Arguments, i, type); + } + } + break; + } + case StreamItemMessage si: + { + if (ReferenceEquals(si.Item, StreamedArgPlaceholder)) + { + // StreamItemMessage.Item is read-only, use a wrapper array + var args = new object?[] { si.Item }; + var type = binder.GetStreamItemType(si.InvocationId!); + return (args, 0, type); + } + break; + } + case CompletionMessage comp: + { + if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder)) + { + var args = new object?[] { comp.Result }; + var type = binder.GetReturnType(comp.InvocationId!); + return (args, 0, type); + } + break; + } + } + + return (Array.Empty(), -1, typeof(object)); + } + + /// + /// Replaces the StreamedArgPlaceholder with the deserialized value in the stored message. + /// + private static void FillStreamedArg(AsyncChunkState state, object? deserializedValue) + { + if (state.StreamedArgIndex < 0) return; + + switch (state.PartialMessage) + { + case InvocationMessage inv: + inv.Arguments[state.StreamedArgIndex] = deserializedValue; + break; + case StreamInvocationMessage sinv: + sinv.Arguments[state.StreamedArgIndex] = deserializedValue; + break; + case StreamItemMessage: + // StreamItemMessage.Item has no public setter — need to create a new message + if (state.PartialMessage is StreamItemMessage si) + state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue); + break; + case CompletionMessage: + // CompletionMessage.Result has no public setter — need to create a new message + if (state.PartialMessage is CompletionMessage comp) + state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue); + break; + } + } + + #endregion + #region Argument Serialization private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, ref int externalBytes) @@ -421,16 +829,15 @@ public class AcBinaryHubProtocol : IHubProtocol return; } - // Segment / AsyncSegment: serialize directly to the pipe + // Segment mode: serialize directly to the pipe via BufferWriterBinaryOutput + // (AsyncSegment goes through WriteMessageChunked, never reaches here) bw.FlushAndReset(); // Reserve arg length prefix directly on the pipe var argLenSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); - var argBytes = _protocolMode == BinaryProtocolMode.AsyncSegment - ? AcBinarySerializer.Serialize(value, (System.IO.Pipelines.PipeWriter)output, _options) - : AcBinarySerializer.Serialize(value, output, _options); + var argBytes = AcBinarySerializer.Serialize(value, output, _options); Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); externalBytes += LengthPrefixSize + argBytes; @@ -470,6 +877,10 @@ public class AcBinaryHubProtocol : IHubProtocol if (argLength == 0) return null; + // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization + if (argLength == -1) + return StreamedArgPlaceholder; + // Null marker check if (argLength == 1) { diff --git a/AyCode.Services/SignalRs/AcSignalRClientBase.cs b/AyCode.Services/SignalRs/AcSignalRClientBase.cs index 24fabbc..e54b630 100644 --- a/AyCode.Services/SignalRs/AcSignalRClientBase.cs +++ b/AyCode.Services/SignalRs/AcSignalRClientBase.cs @@ -54,7 +54,7 @@ namespace AyCode.Services.SignalRs .ConfigureLogging(logging => { // alap minimális MS log level - logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Warning); + logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Debug); // regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt logging.AddAcLogger(_ => Logger); @@ -75,7 +75,10 @@ namespace AyCode.Services.SignalRs var binaryOptions = AcBinarySerializerOptions.Default; binaryOptions.BufferWriterChunkSize = 4096; - return new AyCodeBinaryHubProtocol(binaryOptions, BinaryProtocolMode.AsyncSegment); + // AcSignalRClientBase — a 84. sor környékén: + var signalLogger = sp.GetRequiredService().CreateLogger(); + return new AyCodeBinaryHubProtocol(binaryOptions, BinaryProtocolMode.AsyncSegment, signalLogger); + // és törölhető: AcBinaryHubProtocol.DiagnosticLogger = msg => Logger.Debug(msg); }); //Vagy ha az options-t is DI-ből: diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index 41406bb..0f06b42 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using AyCode.Core.Serializers.Binaries; +using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs; @@ -18,7 +19,7 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol private SignalParams? _currentSignalParams; public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } - public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) : base(options, protocolMode) { } + public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { } protected override void OnArgumentRead(object? value, int index) { @@ -32,6 +33,10 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol if (argLength == 0) return null; + // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization + if (argLength == -1) + return StreamedArgPlaceholder; + if (argLength == 1) { r.TryPeek(out byte marker); diff --git a/AyCode.Services/SignalRs/BinaryProtocolMode.cs b/AyCode.Services/SignalRs/BinaryProtocolMode.cs index 9e30520..e37c51f 100644 --- a/AyCode.Services/SignalRs/BinaryProtocolMode.cs +++ b/AyCode.Services/SignalRs/BinaryProtocolMode.cs @@ -16,10 +16,12 @@ namespace AyCode.Services.SignalRs; /// /// /// AsyncSegment: Serialize via AsyncPipeWriterOutput directly to the PipeWriter, -/// per-chunk FlushAsync sends data to the network during serialization. Deserialize via -/// PipeReaderBinaryInput with on-demand ReadAsync (processes chunks as they arrive). -/// Zerocopy write + pipeline parallelism (ser/network/deser overlap), highest roundtrip potential -/// for large payloads. +/// per-chunk FlushAsync sends data to the network during serialization using self-describing +/// chunked framing: each chunk is [201][UINT16 size][data], end signal is [202]. +/// Deserialize via PipeReaderBinaryInput from internal Pipe with on-demand ReadAsync +/// (background Task processes chunks as they arrive). Zerocopy write + pipeline parallelism +/// (ser/network/deser overlap), highest roundtrip potential for large payloads. +/// Max chunk data size: 65535 bytes (UINT16). /// /// public enum BinaryProtocolMode @@ -37,8 +39,11 @@ public enum BinaryProtocolMode Segment = 1, /// - /// AsyncPipeWriterOutput → PipeWriter, per-chunk FlushAsync. Deser: PipeReaderBinaryInput (on-demand ReadAsync). + /// AsyncPipeWriterOutput → PipeWriter, per-chunk FlushAsync with self-describing chunked framing. + /// Each chunk (including the last) is sent as [201][UINT16 size][data]; end signal is [202]. + /// Deser: PipeReaderBinaryInput from internal Pipe (on-demand ReadAsync, background Task). /// Zerocopy write + pipeline parallelism (ser/network/deser overlap). + /// Max chunk data size: 65535 bytes (UINT16). /// AsyncSegment = 2, } diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md index 61581c5..84304c3 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md @@ -166,8 +166,10 @@ Typical overhead for 225KB payload with 4096-byte segments: ~224.5KB zero-copy, |-------|-----------|-------------|-----------------| | `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `SequenceReader.ToArray()` → `ArrayBinaryInput` (single contiguous buffer, `TryAdvanceSegment` → false, JIT-eliminated) | Fastest individual ser/deser. No zerocopy. No pipeline overlap. | | `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | `SequenceBinaryInput` → multi-segment `ReadOnlySequence` (lazy `TryGet` iteration, cross-boundary scratch) | Zerocopy write. No pipeline overlap. | -| `AsyncSegment` | `AsyncPipeWriterOutput` → directly to `PipeWriter`, per-chunk `FlushAsync().Forget()` with backpressure | `PipeReaderBinaryInput` → on-demand `ReadAsync`, processes chunks as they arrive from the network | Zerocopy write + pipeline parallelism (ser/network/deser overlap). Highest roundtrip potential for large payloads. | +| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing via `PipeWriter`, per-chunk `FlushAsync().Forget()` | `PipeReaderBinaryInput` from internal `Pipe` → background `Task` deser, on-demand `ReadAsync` | Zerocopy write + pipeline parallelism (ser/network/deser overlap). Max chunk: 65535 bytes (UINT16). | -In `AsyncSegment` mode, `WriteArgument` casts `IBufferWriter output` to `PipeWriter` and calls `AcBinarySerializer.Serialize(value, pipeWriter, options)` which uses `AsyncPipeWriterOutput` internally. In `Bytes` and `Segment` mode, the standard `AcBinarySerializer.Serialize(value, output, options)` path is used (BWO on `IBufferWriter`). +In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (written by `AsyncPipeWriterOutput` with 3-byte header reservation, zero-copy), (3) CHUNK_END — `[202]` (1 byte, no data). The receiver's `TryParseMessage` enters chunk accumulation mode after CHUNK_START, feeding data to an internal `Pipe` where a background `Task` deserializes via `PipeReaderBinaryInput`. + +In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used. **Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (consumer logic), `AyCode.Services/SignalRs/BinaryProtocolMode.cs` (enum)