From 8ff75de55cebd9aaca06b6fc4882e3fc8671c173 Mon Sep 17 00:00:00 2001 From: Loretta Date: Fri, 10 Apr 2026 09:27:40 +0200 Subject: [PATCH] Add segment streaming to SignalR binary protocol Implements segment-level streaming for SignalR binary protocol via new AsyncPipeWriterOutput and PipeReaderBinaryInput types, enabling chunked serialization/deserialization directly over PipeWriter/PipeReader. Adds BinaryProtocolMode enum to select between standard and streaming modes. Updates protocol classes and documentation. Lays groundwork for future async streaming support. --- .claude/settings.local.json | 3 +- AyCode.Core/Helpers/TaskHelper.cs | 18 ++ .../Binaries/AcBinaryDeserializer.cs | 13 ++ .../Binaries/AcBinarySerializer.cs | 69 ++++++ .../Binaries/AsyncPipeWriterOutput.cs | 159 +++++++++++++ .../Binaries/PipeReaderBinaryInput.cs | 215 ++++++++++++++++++ AyCode.Core/docs/BINARY_IMPLEMENTATION.md | 3 +- AyCode.Core/docs/BINARY_ISSUES.md | 27 +++ AyCode.Core/docs/BINARY_WRITERS.md | 26 +++ .../SignalRs/AcBinaryHubProtocol.cs | 8 +- .../SignalRs/AyCodeBinaryHubProtocol.cs | 2 +- .../SignalRs/BinaryProtocolMode.cs | 16 ++ AyCode.Services/docs/SIGNALR.md | 3 +- .../docs/SIGNALR_BINARY_PROTOCOL.md | 14 +- 14 files changed, 569 insertions(+), 7 deletions(-) create mode 100644 AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs create mode 100644 AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs create mode 100644 AyCode.Services/SignalRs/BinaryProtocolMode.cs diff --git a/.claude/settings.local.json b/.claude/settings.local.json index bdc223e..df63b95 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -51,7 +51,8 @@ "Bash(2)", "Bash(dotnet --version)", "WebSearch", - "Bash(dotnet script:*)" + "Bash(dotnet script:*)", + "Bash(xargs wc:*)" ] } } diff --git a/AyCode.Core/Helpers/TaskHelper.cs b/AyCode.Core/Helpers/TaskHelper.cs index afc1596..4cc1ac2 100644 --- a/AyCode.Core/Helpers/TaskHelper.cs +++ b/AyCode.Core/Helpers/TaskHelper.cs @@ -84,6 +84,24 @@ } } + public static void Forget(this ValueTask task) + { + if (!task.IsCompleted || task.IsFaulted) + _ = ForgetAwaited(task); + + static async Task ForgetAwaited(ValueTask task) + { + try + { + await task.ConfigureAwait(false); + } + catch + { + // Swallow exception - fire and forget semantics + } + } + } + public static void RunOnThreadPool(this Action action) => ToThreadPoolTask(action).Forget(); public static void RunOnThreadPool(this Func func) => ToThreadPoolTask(func).Forget(); diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index cd7e4bc..224cad1 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -291,6 +291,19 @@ public static partial class AcBinaryDeserializer return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); } + /// + /// Deserialize from PipeReader with segment streaming (read per chunk via PipeReaderBinaryInput). + /// Data is consumed as it arrives from the network, enabling pipeline parallelism. + /// + public static T? Deserialize(System.IO.Pipelines.PipeReader pipeReader, AcBinarySerializerOptions options) + => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), typeof(T), options); + + /// + /// Deserialize from PipeReader to specified type with segment streaming. + /// + public static object? Deserialize(System.IO.Pipelines.PipeReader pipeReader, Type targetType, AcBinarySerializerOptions options) + => DeserializeSequence(new PipeReaderBinaryInput(pipeReader), targetType, options); + /// /// Internal: Deserialize with any TInput (multi-segment or other future input types). /// diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 98a7c86..5221121 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -422,6 +422,66 @@ public static partial class AcBinarySerializer } } + /// + /// Serialize to PipeWriter with segment streaming (flush per chunk via AsyncPipeWriterOutput). + /// Each chunk is flushed to the network as it fills, enabling pipeline parallelism. + /// Returns total bytes written. + /// + public static int Serialize(T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options) + { + if (value == null) + { + var span = pipeWriter.GetSpan(1); + span[0] = BinaryTypeCode.Null; + pipeWriter.Advance(1); + pipeWriter.FlushAsync().GetAwaiter().GetResult(); + return 1; + } + + var runtimeType = value.GetType(); + var context = BinarySerializationContextPool.Get(options); + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize); + context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); + + try + { + if (options.UseGeneratedCode) + { + var wrapper = context.GetWrapper(runtimeType); + if (wrapper.GeneratedWriter != null) + { + ScanForDuplicates(value, runtimeType, context); + context.WriteHeader(); + WriteObject(value, wrapper, context, 0); + + if (options.UseCompression != Compression.Lz4CompressionMode.None) + ThrowCompressionNotSupportedWithPipeWriter(context); + + var bytesWritten = context.Output.GetTotalPosition(context._position); + context.Output.Flush(context._buffer, context._position); + return bytesWritten; + } + } + + var actualValue = ConvertExpressionValue(value, ref runtimeType); + ScanForDuplicates(actualValue, runtimeType, context); + context.WriteHeader(); + WriteValue(actualValue, runtimeType, context, 0); + + if (options.UseCompression != Compression.Lz4CompressionMode.None) + ThrowCompressionNotSupportedWithPipeWriter(context); + + var totalBytesWritten = context.Output.GetTotalPosition(context._position); + context.Output.Flush(context._buffer, context._position); + return totalBytesWritten; + } + finally + { + context.Output = default; + ReturnContext(context, options); + } + } + /// /// Get the serialized size without allocating the final array. /// Useful for pre-allocating buffers. @@ -541,6 +601,15 @@ public static partial class AcBinarySerializer "Use the byte[] overload or disable compression."); } + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowCompressionNotSupportedWithPipeWriter(BinarySerializationContext context) + { + context.Output.Flush(context._buffer, context._position); + throw new NotSupportedException( + "Compression is not supported with PipeWriter output. " + + "Use the byte[] overload or disable compression."); + } + #endregion #endregion diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs new file mode 100644 index 0000000..43bc563 --- /dev/null +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -0,0 +1,159 @@ +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using AyCode.Core.Helpers; + +namespace AyCode.Core.Serializers.Binaries; + +/// +/// Binary output that writes to a PipeWriter with per-chunk network flush. +/// +/// Identical to BufferWriterBinaryOutput except: Grow() calls PipeWriter.FlushAsync().Forget() +/// after committing each chunk, so data flows to the network as it's being serialized +/// rather than waiting for the full serialization to complete. +/// +/// Backpressure: stores the last FlushAsync ValueTask. If the previous flush hasn't completed +/// by the next Grow(), blocks until it does. This bounds memory to ~2 chunks. +/// +/// The first Grow() skips the flush to keep the length prefix span valid for patching. +/// +public struct AsyncPipeWriterOutput : IBinaryOutputBase +{ + private readonly PipeWriter _pipeWriter; + private readonly int _chunkSize; + private int _committedBytes; + private int _currentChunkStart; + private bool _ownedBuffer; + private ValueTask _lastFlush; + private bool _firstGrow; + + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096) + { + _pipeWriter = pipeWriter; + _chunkSize = chunkSize; + _committedBytes = 0; + _ownedBuffer = false; + _lastFlush = default; + _firstGrow = true; + } + + /// + /// Provides the initial buffer from the PipeWriter. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Initialize(out byte[] buffer, out int position, out int bufferEnd) + { + _committedBytes = 0; + _lastFlush = default; + _firstGrow = true; + AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); + _currentChunkStart = position; + } + + /// + /// Called when the context's buffer is full. Commits current chunk to the PipeWriter, + /// fires a background flush (except on the first call — length prefix must stay valid), + /// and acquires a new chunk. + /// + [MethodImpl(MethodImplOptions.NoInlining)] + public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) + { + // Backpressure: wait for previous flush if still in progress + if (!_lastFlush.IsCompleted) + _lastFlush.GetAwaiter().GetResult(); + + // Commit bytes written in current chunk + var bytesInChunk = position - _currentChunkStart; + if (bytesInChunk > 0) + { + if (_ownedBuffer) + FlushOwnedBuffer(buffer, bytesInChunk); + else + _pipeWriter.Advance(bytesInChunk); + _committedBytes += bytesInChunk; + } + + // Fire-and-forget flush — EXCEPT first chunk (length prefix span must stay valid) + if (!_firstGrow) + { + _lastFlush = _pipeWriter.FlushAsync(); + _lastFlush.Forget(); + } + _firstGrow = false; + + // Acquire new chunk + AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd); + _currentChunkStart = position; + } + + /// + /// Returns total bytes written: committed + pending in current chunk. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public int GetTotalPosition(int currentPosition) + => _committedBytes + (currentPosition - _currentChunkStart); + + /// + /// Commits final pending bytes and performs a synchronous flush. + /// Must be called after all writes are complete. + /// + public void Flush(byte[] buffer, int position) + { + // Wait for any in-flight flush + if (!_lastFlush.IsCompleted) + _lastFlush.GetAwaiter().GetResult(); + + var bytesInChunk = position - _currentChunkStart; + if (bytesInChunk > 0) + { + if (_ownedBuffer) + FlushOwnedBuffer(buffer, bytesInChunk); + else + _pipeWriter.Advance(bytesInChunk); + } + + // Final synchronous flush — ensures all data reaches the network + _pipeWriter.FlushAsync().GetAwaiter().GetResult(); + } + + /// + /// No-op for PipeWriter-based output — chunks are owned by PipeWriter, not us. + /// + public void Reset() { } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void FlushOwnedBuffer(byte[] buffer, int bytesInChunk) + { + var span = _pipeWriter.GetSpan(bytesInChunk); + buffer.AsSpan(_currentChunkStart, bytesInChunk).CopyTo(span); + _pipeWriter.Advance(bytesInChunk); + ArrayPool.Shared.Return(buffer); + _ownedBuffer = false; + } + + private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) + { + var actualRequest = Math.Max(requestSize, _chunkSize); + var memory = _pipeWriter.GetMemory(actualRequest); + + if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) + { + buffer = segment.Array; + position = segment.Offset; + bufferEnd = segment.Offset + segment.Count; + _ownedBuffer = false; + } + else + { + // Fallback for non-array-backed PipeWriter (e.g. Kestrel PinnedBlockMemoryPool) + var owned = ArrayPool.Shared.Rent(actualRequest); + buffer = owned; + position = 0; + bufferEnd = owned.Length; + _ownedBuffer = true; + } + } +} diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs new file mode 100644 index 0000000..33a51dc --- /dev/null +++ b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs @@ -0,0 +1,215 @@ +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace AyCode.Core.Serializers.Binaries; + +/// +/// Binary input that reads from a PipeReader, requesting more data when the current buffer is exhausted. +/// +/// Mirrors SequenceBinaryInput's segment iteration and cross-boundary handling, but instead of +/// reading from a fixed ReadOnlySequence, it calls PipeReader.ReadAsync() to get more data +/// as chunks arrive from the network. +/// +/// When the writer flushes per chunk (AsyncPipeWriterOutput), ReadAsync() returns as soon as +/// data is available — typically completing synchronously. This enables pipeline parallelism: +/// serialization, network transfer, and deserialization overlap. +/// +public struct PipeReaderBinaryInput : IBinaryInputBase +{ + private readonly PipeReader _pipeReader; + private ReadOnlySequence _currentBuffer; + private SequencePosition _nextSegmentPosition; + private SequencePosition _consumedUpTo; + private bool _pipeCompleted; + + // Cross-boundary scratch — same pattern as SequenceBinaryInput + private byte[]? _scratchBuffer; + private bool _afterCrossBoundary; + private byte[]? _savedBuffer; + private int _savedPosition; + private int _savedBufferLength; + + public PipeReaderBinaryInput(PipeReader pipeReader) + { + _pipeReader = pipeReader; + _currentBuffer = default; + _nextSegmentPosition = default; + _consumedUpTo = default; + _pipeCompleted = false; + _scratchBuffer = null; + _afterCrossBoundary = false; + _savedBuffer = null; + _savedPosition = 0; + _savedBufferLength = 0; + } + + /// + /// Reads the first data from the PipeReader and provides the first segment's buffer. + /// + public void Initialize(out byte[] buffer, out int position, out int bufferLength) + { + var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + _currentBuffer = result.Buffer; + _pipeCompleted = result.IsCompleted; + _consumedUpTo = _currentBuffer.Start; + _nextSegmentPosition = _currentBuffer.Start; + + if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) + throw new AcBinaryDeserializationException("Empty pipe — no data to read."); + + ExtractArray(memory, out buffer, out position, out bufferLength); + } + + /// + /// Advances to the next segment. If the current ReadResult buffer is exhausted, + /// calls PipeReader.ReadAsync() to get more data from the pipe. + /// + [MethodImpl(MethodImplOptions.NoInlining)] + public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) + { + // After cross-boundary scratch read: restore to the last touched segment + if (_afterCrossBoundary) + { + _afterCrossBoundary = false; + buffer = _savedBuffer!; + position = _savedPosition; + bufferLength = _savedBufferLength; + + if (bufferLength - position >= needed) + return true; + } + + var remaining = bufferLength - position; + + if (remaining > 0 && remaining < needed) + return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); + + // Try next segment in current buffer + if (TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength)) + { + remaining = bufferLength - position; + if (remaining >= needed) return true; + return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); + } + + // Current buffer exhausted — request more data from PipeReader + if (_pipeCompleted) + return false; + + _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); + + var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + _currentBuffer = result.Buffer; + _pipeCompleted = result.IsCompleted; + _consumedUpTo = _currentBuffer.Start; + _nextSegmentPosition = _currentBuffer.Start; + + if (!TryLoadNextSegmentFromBuffer(ref buffer, ref position, ref bufferLength)) + return false; + + remaining = bufferLength - position; + if (remaining >= needed) return true; + return TryReadCrossBoundary(ref buffer, ref position, ref bufferLength, needed, remaining); + } + + /// + /// Returns the scratch buffer and signals the PipeReader that all data has been consumed. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Release() + { + if (_scratchBuffer != null) + { + ArrayPool.Shared.Return(_scratchBuffer); + _scratchBuffer = null; + } + + _pipeReader.AdvanceTo(_currentBuffer.End); + } + + private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength) + { + if (!_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) || memory.Length == 0) + return false; + + ExtractArray(memory, out buffer, out position, out bufferLength); + _consumedUpTo = _nextSegmentPosition; + return true; + } + + private bool TryReadCrossBoundary(ref byte[] buffer, ref int position, ref int bufferLength, int needed, int remaining) + { + if (_scratchBuffer == null || _scratchBuffer.Length < needed) + { + if (_scratchBuffer != null) + ArrayPool.Shared.Return(_scratchBuffer); + _scratchBuffer = ArrayPool.Shared.Rent(needed); + } + + // 1) Copy tail of current segment + Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining); + var filled = remaining; + + // 2) Copy from subsequent segments (may span multiple segments or pipe reads) + while (filled < needed) + { + // Try next segment in current buffer + if (_currentBuffer.TryGet(ref _nextSegmentPosition, out var memory) && memory.Length > 0) + { + ExtractArray(memory, out var segArray, out var segOffset, out var segBufferLength); + _consumedUpTo = _nextSegmentPosition; + var segCount = segBufferLength - segOffset; + var take = Math.Min(needed - filled, segCount); + Buffer.BlockCopy(segArray, segOffset, _scratchBuffer, filled, take); + filled += take; + + _savedBuffer = segArray; + _savedPosition = segOffset + take; + _savedBufferLength = segBufferLength; + continue; + } + + // Current buffer exhausted — read more from pipe + if (_pipeCompleted) + return false; + + _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); + var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + _currentBuffer = result.Buffer; + _pipeCompleted = result.IsCompleted; + _consumedUpTo = _currentBuffer.Start; + _nextSegmentPosition = _currentBuffer.Start; + + if (_currentBuffer.IsEmpty && _pipeCompleted) + return false; + } + + buffer = _scratchBuffer; + position = 0; + bufferLength = filled; + _afterCrossBoundary = true; + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void ExtractArray(ReadOnlyMemory memory, out byte[] buffer, out int position, out int bufferLength) + { + if (MemoryMarshal.TryGetArray(memory, out var segment)) + { + buffer = segment.Array!; + position = segment.Offset; + bufferLength = segment.Offset + segment.Count; + } + else + { + var temp = new byte[memory.Length]; + memory.Span.CopyTo(temp); + buffer = temp; + position = 0; + bufferLength = temp.Length; + } + } +} diff --git a/AyCode.Core/docs/BINARY_IMPLEMENTATION.md b/AyCode.Core/docs/BINARY_IMPLEMENTATION.md index a6242d8..e6d9784 100644 --- a/AyCode.Core/docs/BINARY_IMPLEMENTATION.md +++ b/AyCode.Core/docs/BINARY_IMPLEMENTATION.md @@ -63,7 +63,7 @@ SGen fast path: **Non-SGen penalty:** +1 bool check (`options.UseGeneratedCode`) + 1 `GetWrapper` (cached) + 1 null check ≈ ~10-15ns. -**Location:** `AcBinarySerializer.cs` — both `Serialize(T, options)` (byte[] path) and `Serialize(T, IBufferWriter, options)`. +**Location:** `AcBinarySerializer.cs` — `Serialize(T, options)` (byte[] path), `Serialize(T, IBufferWriter, options)` (BWO path), and `Serialize(T, PipeWriter, options)` (AsyncPipeWriterOutput segment-streaming path). ### Full Runtime Dispatch Chain @@ -130,6 +130,7 @@ Two-phase: - `BinarySerializationContextPool` — byte[] path - `BinarySerializationContextPool` — IBufferWriter path +- `BinarySerializationContextPool` — PipeWriter segment-streaming path - `options.UseAsync` → `ReturnAsync` (ThreadPool enqueue) to avoid lock contention - Pooled contexts retain wrapper caches, buffer instances across serializations diff --git a/AyCode.Core/docs/BINARY_ISSUES.md b/AyCode.Core/docs/BINARY_ISSUES.md index a3b6cc2..3ac2046 100644 --- a/AyCode.Core/docs/BINARY_ISSUES.md +++ b/AyCode.Core/docs/BINARY_ISSUES.md @@ -48,6 +48,33 @@ The scratch buffer is `ArrayPool.Rent`-ed on first cross-boundary read and reuse When `MemoryMarshal.TryGetArray` fails on `IBufferWriter.GetMemory()` (native memory-backed writer), a `byte[]` is rented from `ArrayPool` per chunk and copied to the writer on `Grow`/`Flush`. Same as DESER-1 — non-array-backed writers are extremely rare. +### SER-2: AsyncPipeWriterOutput uses sync GetResult() for backpressure + +**Status:** By design (v1) +**Affects:** `AsyncPipeWriterOutput.Grow()` — `_lastFlush.GetAwaiter().GetResult()` + +When the previous `PipeWriter.FlushAsync()` hasn't completed by the next `Grow()` call, the serializer blocks the thread until the flush completes. This is necessary because `IHubProtocol.WriteMessage` is `void` (synchronous by design). + +**Impact:** Minimal under normal conditions. `PipeWriter.FlushAsync()` writes to an in-memory Kestrel pipe (not directly to the network) and typically completes synchronously. Only blocks when the pipe's internal buffer hits its pause threshold (~1MB), which requires an extremely slow client + large payload. The `Bytes` mode (default) has the same blocking characteristic — it blocks the thread for the entire serialization + single flush. + +**Possible optimization:** `AsyncSegment` mode (future) with a custom async `WriteMessageAsync` protocol interface, enabling `await` on flush instead of `GetResult()`. + +### SER-3: AsyncPipeWriterOutput fallback path — same as SER-1 + +**Status:** Acceptable +**Affects:** `AsyncPipeWriterOutput.AcquireChunk` fallback + +Same `TryGetArray` fallback as `BufferWriterBinaryOutput` (SER-1). Kestrel `PipeWriter.GetMemory()` always returns array-backed memory — fallback is for non-standard `PipeWriter` implementations only. + +## Deserialization (PipeReader) + +### DESER-5: PipeReaderBinaryInput uses sync ReadAsync().GetResult() + +**Status:** By design (v1) +**Affects:** `PipeReaderBinaryInput.Initialize()` and `TryAdvanceSegment()` + +Same constraint as SER-2 — `IBinaryInputBase` interface is synchronous. `ReadAsync().GetAwaiter().GetResult()` blocks when waiting for more data from the pipe. Currently not used in production (SignalR delivers complete messages via `TryParseMessage`). Reserved for future direct-pipe deserialization scenarios. + ## Source Generator (SGen) ### SGEN-1: CS8625 warnings for non-nullable reference types diff --git a/AyCode.Core/docs/BINARY_WRITERS.md b/AyCode.Core/docs/BINARY_WRITERS.md index ed0cf9e..746f0ec 100644 --- a/AyCode.Core/docs/BINARY_WRITERS.md +++ b/AyCode.Core/docs/BINARY_WRITERS.md @@ -99,5 +99,31 @@ void Release(); - **ArrayBinaryInput:** single `byte[]`, `TryAdvanceSegment => false` (JIT-eliminated), `Release` no-op. - **SequenceBinaryInput:** lazy `TryGet` iteration over `ReadOnlySequence`. Context `_buffer` points to segment backing `byte[]` (zero-copy). Cross-boundary: `ArrayPool` scratch, N-segment loop. `Release` returns scratch to pool. +- **PipeReaderBinaryInput:** reads from `PipeReader` with on-demand data via `ReadAsync`. Same cross-boundary pattern as `SequenceBinaryInput`, but when all segments in the current `ReadResult` are exhausted, calls `AdvanceTo` + `ReadAsync().GetAwaiter().GetResult()` to get more data from the pipe. Enables pipeline parallelism with `AsyncPipeWriterOutput`: deserializer processes chunks as they arrive from the network instead of waiting for the full payload. `Release` returns scratch to pool + signals pipe consumption via `AdvanceTo`. + +## AsyncPipeWriterOutput + +`struct AsyncPipeWriterOutput : IBinaryOutputBase` — writes to `PipeWriter` with per-chunk network flush. Enables segment-level streaming: each serializer chunk goes to the network immediately. + +### Differences from BufferWriterBinaryOutput + +Same cached chunk pattern (`GetMemory` → `TryGetArray` → direct array writes), but `Grow()` flushes the current chunk to the network before acquiring the next: + +1. Wait for previous flush if still in-flight (`_lastFlush` backpressure) +2. `Advance(bytesInChunk)` — commit to `PipeWriter` +3. `FlushAsync().Forget()` — fire-and-forget network send +4. Acquire next chunk via `GetMemory` + +**First-Grow skip:** the first `Grow()` call does NOT flush — the length prefix span (reserved by the protocol before serialization) must stay valid until patching. `_firstGrow` flag controls this. + +**Backpressure:** `_lastFlush` (ValueTask) tracks the most recent flush. If the serializer produces chunks faster than the network consumes them, the next `Grow()` waits — max ~2 chunks in memory at any time. + +### Usage + +Selected via `BinaryProtocolMode.Segment` in `AcBinaryHubProtocol`. The protocol casts `IBufferWriter output` to `PipeWriter` (safe — SignalR always provides `PipeWriter`). + +```csharp +AcBinarySerializer.Serialize(value, (PipeWriter)output, options) // AsyncPipeWriterOutput path +``` > Known issues and limitations: `BINARY_ISSUES.md` diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 4358c2b..cd8b8a1 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -47,13 +47,15 @@ public class AcBinaryHubProtocol : IHubProtocol private const byte MsgSequence = 9; protected volatile AcBinarySerializerOptions _options; + protected readonly BinaryProtocolMode _protocolMode; public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } - public AcBinaryHubProtocol(AcBinarySerializerOptions options) + public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) { _options = options; _options.BufferWriterChunkSize = 4096; + _protocolMode = protocolMode; } /// @@ -417,7 +419,9 @@ public class AcBinaryHubProtocol : IHubProtocol var argLenSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); - var argBytes = AcBinarySerializer.Serialize(value, output, _options); + var argBytes = _protocolMode == BinaryProtocolMode.Segment + ? AcBinarySerializer.Serialize(value, (System.IO.Pipelines.PipeWriter)output, _options) + : AcBinarySerializer.Serialize(value, output, _options); Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); externalBytes += LengthPrefixSize + argBytes; diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index f090de6..f8312bf 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -18,7 +18,7 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol private SignalParams? _currentSignalParams; public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } - public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options) : base(options) { } + public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) : base(options, protocolMode) { } protected override void OnArgumentRead(object? value, int index) { diff --git a/AyCode.Services/SignalRs/BinaryProtocolMode.cs b/AyCode.Services/SignalRs/BinaryProtocolMode.cs new file mode 100644 index 0000000..a11e80e --- /dev/null +++ b/AyCode.Services/SignalRs/BinaryProtocolMode.cs @@ -0,0 +1,16 @@ +namespace AyCode.Services.SignalRs; + +/// +/// Controls how the binary protocol transports serialized data over the network. +/// +public enum BinaryProtocolMode +{ + /// Standard: serialize → egyben küld/fogad. + Bytes = 0, + + /// Szinkron segment streaming: flush Grow()-ban → chunk-onként hálózatra. + Segment = 1, + + /// Async segment streaming: async serializer + async output (jövő). + AsyncSegment = 2, +} diff --git a/AyCode.Services/docs/SIGNALR.md b/AyCode.Services/docs/SIGNALR.md index e850d48..8c7aea5 100644 --- a/AyCode.Services/docs/SIGNALR.md +++ b/AyCode.Services/docs/SIGNALR.md @@ -70,7 +70,7 @@ CRUD helpers (`PostAsync`, `GetByIdAsync`, `GetAllAsync`, `PostDataAsync`) are g ### AcBinaryHubProtocol / AyCodeBinaryHubProtocol -Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy write via `BufferWriterBinaryOutput` standalone mode + `AcBinarySerializer.Serialize(value, output)` directly to pipe. Zero-copy read via `SequenceReader` from pipe's `ReadOnlySequence`. +Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy write via `BufferWriterBinaryOutput` standalone mode + `AcBinarySerializer.Serialize(value, output)` directly to pipe. Zero-copy read via `SequenceReader` from pipe's `ReadOnlySequence`. `BinaryProtocolMode` constructor parameter selects transport strategy: `Bytes` (default, single flush), `Segment` (per-chunk flush via `AsyncPipeWriterOutput`), `AsyncSegment` (reserved). `AcBinaryHubProtocol` is the base (unsealed) — general binary framing only. `AyCodeBinaryHubProtocol` derives from it with consumer-specific logic: `SignalParams` capture (via `OnArgumentRead` hook), `IsRawBytesData` path, `SignalDataType` type resolution. Register `AyCodeBinaryHubProtocol` in both client and server. @@ -182,6 +182,7 @@ Type-guided deserialization — each parameter is individually serialized/deseri | Client base | `SignalRs/AcSignalRClientBase.cs` | | Binary protocol (base) | `SignalRs/AcBinaryHubProtocol.cs` | | Binary protocol (derived) | `SignalRs/AyCodeBinaryHubProtocol.cs` | +| Protocol mode enum | `SignalRs/BinaryProtocolMode.cs` | | Tag attributes | `SignalRs/SignalMessageTagAttribute.cs` | | Base tags | `SignalRs/AcSignalRTags.cs` | | CRUD tags | `SignalRs/SignalRCrudTags.cs` | diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md index 77132c0..803a8e4 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md @@ -158,4 +158,16 @@ Typical overhead for 225KB payload with 4096-byte segments: ~224.5KB zero-copy, | `BufferWriterChunkSize` | 65536 | 4096 | Chunk size for BWO. SignalR sets 4096 in `AcBinaryHubProtocol` constructor. | | `InitialBufferCapacity` | 16384 | — | Starting buffer for `ArrayBinaryOutput` (byte[] serialize path) | -**Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (consumer logic) +## BinaryProtocolMode + +`enum BinaryProtocolMode` — constructor parameter for `AcBinaryHubProtocol`, selects transport strategy: + +| Value | Behavior | +|-------|----------| +| `Bytes` (default) | Standard: serialize to `BufferWriterBinaryOutput`, single flush at end. | +| `Segment` | Segment streaming: serialize to `AsyncPipeWriterOutput`, flush per 4096-byte chunk via `PipeWriter.FlushAsync().Forget()`. Network transfer overlaps with serialization. | +| `AsyncSegment` | Reserved for future async serializer. | + +In `Segment` mode, `WriteArgument` casts `IBufferWriter output` to `PipeWriter` and calls `AcBinarySerializer.Serialize(value, pipeWriter, options)` which uses `AsyncPipeWriterOutput` internally. The reader side currently uses the same `SequenceBinaryInput` path (SignalR delivers complete messages via `TryParseMessage`). `PipeReaderBinaryInput` is available for future direct-pipe deserialization. + +**Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (consumer logic), `AyCode.Services/SignalRs/BinaryProtocolMode.cs` (enum)