From c6e1fa8efc101750937ccbc801a40f0d3dfd9292 Mon Sep 17 00:00:00 2001 From: Loretta Date: Mon, 20 Apr 2026 17:44:37 +0200 Subject: [PATCH] Refactor: centralize SignalR protocol config/options - Added AcBinaryHubProtocolOptions for unified protocol configuration (serializer, mode, buffer size, flush strategy, timeout, name, logger) with validation and DI support. - Refactored AcBinaryHubProtocol and AyCodeBinaryHubProtocol to use options object; legacy constructors now delegate to options-based API. - Added per-chunk flush timeout to AsyncPipeWriterOutput and AcBinarySerializer; throws TimeoutException on slow consumers. - Improved XML docs and comments for pipeline/backpressure/timeout clarity. - Updated SIGNALR_BINARY_PROTOCOL.md to document new options and AsyncSegment platform rules. --- .../Binaries/AcBinarySerializer.cs | 19 ++- .../Binaries/AsyncPipeWriterOutput.cs | 57 ++++++--- .../SignalRs/AcBinaryHubProtocol.cs | 89 ++++++++++++-- .../SignalRs/AcBinaryHubProtocolOptions.cs | 113 ++++++++++++++++++ .../SignalRs/AyCodeBinaryHubProtocol.cs | 18 ++- .../docs/SIGNALR_BINARY_PROTOCOL.md | 49 ++++++-- 6 files changed, 300 insertions(+), 45 deletions(-) create mode 100644 AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index c5b2fbe..26bf907 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -423,15 +423,28 @@ public static partial class AcBinarySerializer } /// - /// Serialize to PipeWriter with chunked protocol framing via AsyncPipeWriterOutput. + /// Serialize to PipeWriter with chunked protocol framing via . /// Each chunk (including the last) is framed as [201][UINT16 size][data] and committed /// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single [202] /// byte after to signal end-of-stream. /// + /// The value to serialize; null writes a single null marker. + /// Target pipe (typically Kestrel's transport output). + /// Serializer options (type wrappers, reference handling, interning, etc.). + /// + /// Per-chunk flush synchronization. true (default): maximum pipeline parallelism, + /// guaranteed zero-copy, but slow consumers block the server thread (bounded by ). + /// false: adaptive backpressure via memory threshold — safer for mixed consumer speeds. + /// + /// + /// Per-flush timeout. null → wait forever (legacy). Positive value: throws + /// on stuck consumers. + /// /// Total serialized data bytes (excluding framing overhead). public static int Serialize( T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options, - bool waitForFlush = true) + bool waitForFlush = true, + TimeSpan? flushTimeout = null) { if (value == null) { @@ -444,7 +457,7 @@ public static partial class AcBinarySerializer var runtimeType = value.GetType(); var context = BinarySerializationContextPool.Get(options); - context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush); + context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush, flushTimeout); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 1b959fb..b54c8b4 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -9,24 +9,32 @@ using System.Threading.Tasks; namespace AyCode.Core.Serializers.Binaries; /// -/// Binary output that writes to a PipeWriter with per-chunk network flush and self-describing framing. +/// Binary output that writes to a PipeWriter with per-chunk self-describing framing. /// /// Each chunk (including the last) is framed as [201][UINT16 size][data] with a 3-byte header /// reserved at the start of each buffer. The serializer context writes into the space after the -/// reserved bytes; on Grow(), the header is patched and the full chunk is committed via Advance -/// and flushed to the network. Flush() does the same for the last (partial) chunk — zero-copy -/// for both intermediate and final chunks. +/// reserved bytes; on , the header is patched and the full chunk is committed via +/// Advance (zero-copy). does the same for the last (partial) chunk. /// /// The protocol layer writes a single [202] byte after all chunks to signal end-of-stream. /// -/// Backpressure modes (controlled by waitForFlush constructor parameter): +/// Backpressure modes (controlled by waitForFlush): /// -/// waitForFlush=true (default): Grow() blocks if the previous FlushAsync hasn't completed. -/// Bounds memory to ~2 chunks in flight. -/// waitForFlush=false: Grow() never blocks. Data accumulates in the PipeWriter's internal -/// buffer and is sent with the next completed flush. Maximum serialization throughput. +/// waitForFlush=true (default): Grow() waits for the previous FlushAsync before +/// starting a new chunk. Pro: maximum pipeline parallelism, guaranteed end-to-end zero-copy. +/// Con: slow consumer propagates back as server-thread blocking (bounded by flushTimeout). +/// waitForFlush=false: Grow() is fire-and-forget per chunk; only blocks when committed +/// bytes exceed ~60 KB (memory threshold — itself an adaptive backpressure). +/// Pro: no per-chunk waits, safer with mixed consumer speeds. +/// Con: under heavy backpressure may fall back to an owned buffer, losing zero-copy +/// for that chunk. /// /// +/// Timeout safety: every synchronous flush-await is bounded by flushTimeout +/// (default when the type is used directly; +/// passes 10 s from its options). A +/// propagates to the caller, allowing the connection to abort instead of blocking forever. +/// /// Maximum chunk data size: 65535 bytes (UINT16 max). /// public struct AsyncPipeWriterOutput : IBinaryOutputBase @@ -43,12 +51,19 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private readonly PipeWriter _pipeWriter; private readonly int _chunkSize; private readonly bool _waitForFlush; + private readonly TimeSpan _flushTimeout; private int _committedBytes; private int _currentChunkStart; private bool _ownedBuffer; private ValueTask _lastFlush; - public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true) + /// Creates an output bound to the given PipeWriter with self-describing chunked framing. + /// Target pipe (typically Kestrel's transport output for SignalR). + /// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size. + /// See class summary — pipeline parallelism (true) vs adaptive (false). + /// Per-flush timeout. null + /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers. + public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true, TimeSpan? flushTimeout = null) { if (chunkSize > MaxChunkSize) throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize, @@ -57,6 +72,9 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _pipeWriter = pipeWriter; _chunkSize = chunkSize; _waitForFlush = waitForFlush; + // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms). + // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller. + _flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan; _committedBytes = 0; _ownedBuffer = false; _lastFlush = default; @@ -65,13 +83,24 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase /// /// Synchronously awaits a FlushAsync ValueTask. /// Fast-path: if already completed, returns without Task allocation. - /// Slow-path: converts to Task for proper blocking (backpressure). + /// Slow-path: blocks with — throws + /// if the flush does not complete within it (guards against slow/stuck/disconnected consumers). + /// means "wait forever" (natively supported by Task.Wait). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void SyncAwaitFlush(ValueTask vt) + private void SyncAwaitFlush(ValueTask vt) { - if (!vt.IsCompletedSuccessfully) - vt.AsTask().GetAwaiter().GetResult(); + if (vt.IsCompletedSuccessfully) return; + + var task = vt.AsTask(); + + if (!task.Wait(_flushTimeout)) + throw new TimeoutException( + $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + + "consumer may be too slow, stuck, or disconnected."); + + // Completed within timeout — propagate any faulted exception + task.GetAwaiter().GetResult(); } /// diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index e26b953..5202c34 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -76,6 +76,17 @@ public class AcBinaryHubProtocol : IHubProtocol protected readonly BinaryProtocolMode _protocolMode; protected readonly ILogger? _logger; + /// + /// AsyncSegment per-chunk flush synchronization — see . + /// + protected readonly bool _waitForFlush; + + /// + /// Per-flush wait limit — see . + /// Guaranteed positive or by . + /// + protected readonly TimeSpan _flushTimeout; + /// /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly). /// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server. @@ -108,10 +119,38 @@ public class AcBinaryHubProtocol : IHubProtocol public int ChunkFrameBytesConsumed; } - public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } + /// + /// Parameterless constructor — creates the protocol with all-default options + /// (, 4 KB buffer, 10 s flush timeout, "acbinary" name). + /// Mainly for tests and simple scenarios. For production, pass an explicit + /// or configure via DI. + /// + public AcBinaryHubProtocol() : this(new AcBinaryHubProtocolOptions()) { } + /// + /// Legacy constructor — wraps the arguments into + /// and delegates to the options-based constructor. Kept for backward compatibility; + /// will be removed in a future version in favor of the options-based API. + /// public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) + : this(new AcBinaryHubProtocolOptions + { + SerializerOptions = options, + ProtocolMode = protocolMode, + Logger = logger, + BufferSize = 4096 + // FlushTimeout, WaitForFlush, Name — use options defaults (30s, true, "acbinary") + }) + { } + + /// + /// Primary constructor. All configuration flows through . + /// + public AcBinaryHubProtocol(AcBinaryHubProtocolOptions options) { + if (options is null) throw new ArgumentNullException(nameof(options)); + options.Validate(); + // Send-side guard: AsyncSegment uses AsyncPipeWriterOutput whose sync-over-async flush // would block the browser's single UI thread. The receive side converts chunked wire // to a synchronous deserialize on WASM automatically (see TryParseChunkData). @@ -119,22 +158,26 @@ public class AcBinaryHubProtocol : IHubProtocol // TEMP: commented out to test AsyncSegment on both Windows app and WASM without rebuild. // Small WASM payloads work; larger ones may deadlock on sync-over-async FlushAsync. // Restore once BinaryProtocolMode is runtime-configurable in Program.cs. - //if (IsBrowser && protocolMode == BinaryProtocolMode.AsyncSegment) + //if (IsBrowser && options.ProtocolMode == BinaryProtocolMode.AsyncSegment) // throw new PlatformNotSupportedException( // "BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " + // "Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead."); - _options = options; - _options.BufferWriterChunkSize = 4096; - _protocolMode = protocolMode; - _logger = logger; + _options = options.SerializerOptions; + _options.BufferWriterChunkSize = options.BufferSize; + _protocolMode = options.ProtocolMode; + _logger = options.Logger; + _waitForFlush = options.WaitForFlush; + _flushTimeout = options.FlushTimeout; + Name = options.Name; _chunkStates = new ConditionalWeakTable(); if (_logger != null) { _logger.LogInformation( - "AcBinaryHubProtocol initialized mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", - _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, + "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", + Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, + _waitForFlush, _flushTimeout.TotalMilliseconds, _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); } } @@ -149,18 +192,38 @@ public class AcBinaryHubProtocol : IHubProtocol set => _options = value; } - public string Name => "acbinary"; + /// Protocol name sent in SignalR handshake. Set via . Default: "acbinary". + public string Name { get; } = "acbinary"; + public int Version => 1; public TransferFormat TransferFormat => TransferFormat.Binary; /// /// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask. /// Fast-path: if already completed (no backpressure), returns directly without Task allocation. - /// Slow-path: converts to Task for proper blocking when pipe backpressure is active. + /// Slow-path: blocks with — throws + /// if the flush does not complete within the timeout (protects against slow/stuck/disconnected + /// consumers holding the server thread indefinitely). + /// + /// guarantees _flushTimeout is either + /// positive or (which Task.Wait + /// natively treats as "wait forever"), so no explicit zero-check is needed here. + /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static FlushResult SyncFlush(ValueTask vt) - => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult(); + private FlushResult SyncFlush(ValueTask vt) + { + if (vt.IsCompletedSuccessfully) return vt.Result; + + var task = vt.AsTask(); + + if (!task.Wait(_flushTimeout)) + throw new TimeoutException( + $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + + "consumer may be too slow, stuck, or disconnected."); + + return task.GetAwaiter().GetResult(); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsVersionSupported(int version) => version <= Version; @@ -473,7 +536,7 @@ public class AcBinaryHubProtocol : IHubProtocol // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- if (streamedArg != null) { - dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); + dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _waitForFlush, _flushTimeout); _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); } diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs new file mode 100644 index 0000000..6a404cb --- /dev/null +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs @@ -0,0 +1,113 @@ +using AyCode.Core.Serializers.Binaries; +using Microsoft.Extensions.Logging; + +namespace AyCode.Services.SignalRs; + +/// +/// Configuration for and derived protocols. +/// Use via services.Configure<AcBinaryHubProtocolOptions>(...) in Program.cs, +/// or pass an instance directly to the protocol constructor. +/// +public sealed class AcBinaryHubProtocolOptions +{ + // --- Serializer (standalone sub-group, also usable without SignalR via ToBinary/BinaryTo) --- + + /// Binary serializer options. Default: . + public AcBinarySerializerOptions SerializerOptions { get; set; } = AcBinarySerializerOptions.Default; + + // --- Transport --- + + /// Wire format and pipeline strategy. Default: . + public BinaryProtocolMode ProtocolMode { get; set; } = BinaryProtocolMode.Bytes; + + /// Chunk size for BufferWriterBinaryOutput / AsyncPipeWriterOutput. + /// Default: 4096 (aligns with Kestrel's slab size for optimal latency-to-first-byte). + public int BufferSize { get; set; } = 4096; + + /// + /// Per-chunk flush synchronization in mode. + /// + /// true (default): every Grow waits for the previous FlushAsync to complete + /// before starting the next chunk. Guarantees end-to-end zero-copy (no owned-buffer fallback) + /// and maximum pipeline parallelism. Best for high-throughput servers with fast consumers. + /// false: fire-and-forget flush per chunk; blocks only when committed bytes exceed + /// the memory threshold (~60 KB). Itself an adaptive backpressure mode — a fast consumer + /// never triggers a wait, a slow consumer naturally throttles through buffer pressure. + /// Safer for mixed consumer speeds and memory-sensitive environments. + /// + /// Ignored for Bytes and Segment modes. + /// + public bool WaitForFlush { get; set; } = true; + + /// + /// Maximum wait for a single synchronous FlushAsync before throwing + /// . Protects against slow/stuck/disconnected consumers + /// blocking the server thread indefinitely. + /// + /// Default: 10 seconds. Rationale: AsyncSegment chunks are max 65 KB (UINT16 size field); + /// even GPRS-class connections (~60 Kbit/s) transfer 65 KB in ~9 s. If a flush exceeds 10 s, + /// the consumer is effectively stuck — faster failure detection is preferable to indefinite blocking. + /// + /// + /// Note: complementary to SignalR's connection-level timeouts (ClientTimeoutInterval, + /// KeepAliveInterval). This is a per-operation guard; SignalR timeouts manage the overall + /// connection lifetime. Set FlushTimeout < ClientTimeoutInterval so this guard fires first. + /// + /// Set to to disable. + /// + public TimeSpan FlushTimeout { get; set; } = TimeSpan.FromSeconds(10); + + // --- Identity --- + + /// Protocol name in the SignalR handshake. Client and server must match. Default: "acbinary". + public string Name { get; set; } = "acbinary"; + + // --- Diagnostics --- + + /// Optional logger. When null, no protocol-internal logging occurs. + public ILogger? Logger { get; set; } + + // --- Validation --- + + /// + /// Validates option values and platform compatibility. + /// Throws / on invalid values, + /// or for unsupported platform/mode combinations. + /// + public void Validate() + { + // NOTE: WASM + AsyncSegment send-path guard is currently commented out in the protocol + // constructor for testing. Once BinaryProtocolMode becomes runtime-configurable in + // Program.cs, this validation will be re-enabled here as the primary guard. + //if (OperatingSystem.IsBrowser() && ProtocolMode == BinaryProtocolMode.AsyncSegment) + // throw new PlatformNotSupportedException( + // "BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " + + // "Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead."); + + if (BufferSize < 256 || BufferSize > AsyncPipeWriterOutput.MaxChunkSize) + throw new ArgumentOutOfRangeException(nameof(BufferSize), BufferSize, + $"BufferSize must be between 256 and {AsyncPipeWriterOutput.MaxChunkSize} (UINT16 max)."); + + if (FlushTimeout <= TimeSpan.Zero && FlushTimeout != System.Threading.Timeout.InfiniteTimeSpan) + throw new ArgumentOutOfRangeException(nameof(FlushTimeout), FlushTimeout, + "FlushTimeout must be positive, or Timeout.InfiniteTimeSpan to disable."); + + if (string.IsNullOrWhiteSpace(Name)) + throw new ArgumentException("Name cannot be null or whitespace.", nameof(Name)); + } + + /// + /// Creates a shallow copy. Required for DI IOptions<T> integration — + /// the singleton from DI must not be mutated by per-connection configure callbacks. + /// + public AcBinaryHubProtocolOptions Clone() => new() + { + SerializerOptions = SerializerOptions, + ProtocolMode = ProtocolMode, + BufferSize = BufferSize, + WaitForFlush = WaitForFlush, + FlushTimeout = FlushTimeout, + Name = Name, + Logger = Logger + }; +} diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index b34b495..aaeb2ac 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -24,8 +24,22 @@ namespace AyCode.Services.SignalRs; /// public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol { - public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } - public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { } + /// + /// Parameterless constructor — creates the protocol with all-default options. See base class. + /// + public AyCodeBinaryHubProtocol() : base() { } + + /// + /// Legacy constructor — delegates to the base legacy constructor, which wraps into + /// . Kept for backward compatibility. + /// + public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) + : base(options, protocolMode, logger) { } + + /// + /// Primary constructor — accepts a fully-configured . + /// + public AyCodeBinaryHubProtocol(AcBinaryHubProtocolOptions options) : base(options) { } #region Wire header (per-message) diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md index ebec83d..b740daa 100644 --- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md +++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md @@ -150,25 +150,48 @@ Typical overhead for 225KB payload with 4096-byte segments: ~224.5KB zero-copy, > Known issues: `AyCode.Core/docs/BINARY_ISSUES.md` -## Config +## Configuration -| Property | Default | SignalR | Purpose | -|----------|---------|---------|---------| -| `Options` | `AcBinarySerializerOptions.Default` | — | Serializer options (volatile, runtime-replaceable) | -| `BufferWriterChunkSize` | 65536 | 4096 | Chunk size for BWO. SignalR sets 4096 in `AcBinaryHubProtocol` constructor. | -| `InitialBufferCapacity` | 16384 | — | Starting buffer for `ArrayBinaryOutput` (byte[] serialize path) | +Hub protocol settings are controlled via **`AcBinaryHubProtocolOptions`** (mutable class). Pass directly to the protocol constructor, or configure via DI in `Program.cs` with `services.Configure(opts => …)`. + +| Property | Default | Purpose | +|----------|---------|---------| +| `SerializerOptions` | `AcBinarySerializerOptions.Default` | Binary serializer options (also usable standalone via `ToBinary`/`BinaryTo`). | +| `ProtocolMode` | `Bytes` | Wire format and pipeline strategy — see **BinaryProtocolMode** below. | +| `BufferSize` | 4096 | Per-chunk size. 4 KB aligns with Kestrel's slab. Max 65535 (UINT16). | +| `WaitForFlush` | `true` | AsyncSegment flush strategy — see trade-off below. | +| `FlushTimeout` | 10 s | Per-flush wait limit. `Timeout.InfiniteTimeSpan` = disabled. | +| `Name` | `"acbinary"` | SignalR handshake protocol name. Client and server must match. | +| `Logger` | `null` | Optional `ILogger`; injected from DI when registered. | + +Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCode=true` (hybrid source-gen + reflection), `UseStringInterning=All`, `InitialBufferCapacity=16384`. + +### `WaitForFlush` (AsyncSegment-only) + +| Value | Pro | Con | +|-------|-----|-----| +| **`true`** (default) | Max pipeline parallelism + guaranteed end-to-end zero-copy on send. | Slow consumer propagates back as server-thread blocking (bounded by `FlushTimeout`). | +| **`false`** | Adaptive — fire-and-forget per chunk, blocks only when ~60 KB memory threshold is hit. | Under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. | + +### `FlushTimeout` rationale (10 s default) + +- AsyncSegment chunks are ≤ 65 KB (UINT16). Even GPRS-class links (~60 Kbit/s) transfer 65 KB in ~9 s — so any flush exceeding 10 s indicates a genuinely stuck consumer. +- **Pro**: fast failure detection; server thread never blocks indefinitely. +- **Con**: an unusually slow but otherwise healthy consumer will be disconnected — tune up for satellite / throttled links. +- Complementary to SignalR's connection-level timeouts (`ClientTimeoutInterval`, `KeepAliveInterval`). Set `FlushTimeout < ClientTimeoutInterval` so this per-operation guard fires first. +- Set to `Timeout.InfiniteTimeSpan` to fully disable (legacy behavior). ## BinaryProtocolMode `enum BinaryProtocolMode` — constructor parameter for `AcBinaryHubProtocol`, selects serialization + transport strategy: -| Value | Serialize | Deserialize | Characteristics | -|-------|-----------|-------------|-----------------| -| `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `SequenceReader.ToArray()` → `ArrayBinaryInput` (single contiguous buffer, `TryAdvanceSegment` → false, JIT-eliminated) | Fastest individual ser/deser. No zerocopy. No pipeline overlap. | -| `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | `SequenceBinaryInput` → multi-segment `ReadOnlySequence` (lazy `TryGet` iteration, cross-boundary scratch) | Zerocopy write. No pipeline overlap. | -| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing via `PipeWriter`, per-chunk `FlushAsync().Forget()` | `PipeReaderBinaryInput` from internal `Pipe` → background `Task` deser, on-demand `ReadAsync` | Zerocopy write + pipeline parallelism (ser/network/deser overlap). Max chunk: 65535 bytes (UINT16). | +| Value | Serialize | Deserialize (non-WASM) | Pro / Con | +|-------|-----------|----------------------|-----------| +| `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `ArrayBinaryInput` (single contiguous buffer via `MemoryMarshal.TryGetArray` zero-copy / pool-rent). | **Pro**: simplest, fastest per-call, WASM-safe on both sides. **Con**: no zero-copy write, no pipeline overlap. | +| `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | Same as Bytes (unified `ArrayBinaryInput` receive path — `_protocolMode` affects send only). | **Pro**: zero-copy write, WASM-safe. **Con**: no pipeline overlap — receiver must wait for full payload before deser starts. | +| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk, per-chunk `FlushAsync` with timeout-bounded sync-await | `SegmentBufferReader` (growing contiguous byte[]) + `SegmentBufferReaderInput`; background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. | -In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (written by `AsyncPipeWriterOutput` with 3-byte header reservation, zero-copy), (3) CHUNK_END — `[202]` (1 byte, no data). The receiver's `TryParseMessage` enters chunk accumulation mode after CHUNK_START, feeding data to an internal `Pipe` where a background `Task` deserializes via `PipeReaderBinaryInput`. +In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (zero-copy via `PipeWriter.Advance` with 3-byte header reservation), (3) CHUNK_END — `[202]` (1 byte). The receiver's `TryParseChunkData` accumulates into a `SegmentBufferReader`; on non-WASM platforms a background `Task.Run` deserializes in parallel via `SegmentBufferReaderInput`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data. In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used. @@ -176,7 +199,7 @@ In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used. The send and receive paths handle WASM (`OperatingSystem.IsBrowser()`) asymmetrically — **send** is strictly bound to `_protocolMode`, **receive** adapts to the wire format and falls back to a synchronous path only when the platform cannot support the optimal strategy. -- **Send path**: `AsyncSegment` is **not supported on WebAssembly**. The constructor throws `PlatformNotSupportedException` if `IsBrowser && protocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`. +- **Send path**: `AsyncSegment` is **not supported on WebAssembly**. The `AcBinaryHubProtocolOptions.Validate()` method throws `PlatformNotSupportedException` if `IsBrowser && ProtocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`. *(Note: this guard is currently commented out in `Validate()` to enable hybrid Windows-app + WASM testing against a single protocol instance. Will be re-enabled once the options are fully wired through `Program.cs`.)* - **Receive path**: works on WASM with **any** server-side mode (including `AsyncSegment` → chunked wire). `TryParseChunkData` detects the platform at runtime: - **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over a `SegmentBufferReader` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result. - **Browser**: the background task is skipped. Chunks accumulate in `SegmentBufferReader`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread. `SegmentBufferReaderInput.TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM).