diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
index c5b2fbe..26bf907 100644
--- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
+++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs
@@ -423,15 +423,28 @@ public static partial class AcBinarySerializer
}
///
- /// Serialize to PipeWriter with chunked protocol framing via AsyncPipeWriterOutput.
+ /// Serialize to PipeWriter with chunked protocol framing via .
/// Each chunk (including the last) is framed as [201][UINT16 size][data] and committed
/// to the PipeWriter via Advance (zero-copy). The protocol layer writes a single [202]
/// byte after to signal end-of-stream.
///
+ /// The value to serialize; null writes a single null marker.
+ /// Target pipe (typically Kestrel's transport output).
+ /// Serializer options (type wrappers, reference handling, interning, etc.).
+ ///
+ /// Per-chunk flush synchronization. true (default): maximum pipeline parallelism,
+ /// guaranteed zero-copy, but slow consumers block the server thread (bounded by ).
+ /// false: adaptive backpressure via memory threshold — safer for mixed consumer speeds.
+ ///
+ ///
+ /// Per-flush timeout. null → wait forever (legacy). Positive value: throws
+ /// on stuck consumers.
+ ///
/// Total serialized data bytes (excluding framing overhead).
public static int Serialize(
T value, System.IO.Pipelines.PipeWriter pipeWriter, AcBinarySerializerOptions options,
- bool waitForFlush = true)
+ bool waitForFlush = true,
+ TimeSpan? flushTimeout = null)
{
if (value == null)
{
@@ -444,7 +457,7 @@ public static partial class AcBinarySerializer
var runtimeType = value.GetType();
var context = BinarySerializationContextPool.Get(options);
- context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush);
+ context.Output = new AsyncPipeWriterOutput(pipeWriter, options.BufferWriterChunkSize, waitForFlush, flushTimeout);
context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd);
try
diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
index 1b959fb..b54c8b4 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
@@ -9,24 +9,32 @@ using System.Threading.Tasks;
namespace AyCode.Core.Serializers.Binaries;
///
-/// Binary output that writes to a PipeWriter with per-chunk network flush and self-describing framing.
+/// Binary output that writes to a PipeWriter with per-chunk self-describing framing.
///
/// Each chunk (including the last) is framed as [201][UINT16 size][data] with a 3-byte header
/// reserved at the start of each buffer. The serializer context writes into the space after the
-/// reserved bytes; on Grow(), the header is patched and the full chunk is committed via Advance
-/// and flushed to the network. Flush() does the same for the last (partial) chunk — zero-copy
-/// for both intermediate and final chunks.
+/// reserved bytes; on , the header is patched and the full chunk is committed via
+/// Advance (zero-copy). does the same for the last (partial) chunk.
///
/// The protocol layer writes a single [202] byte after all chunks to signal end-of-stream.
///
-/// Backpressure modes (controlled by waitForFlush constructor parameter):
+/// Backpressure modes (controlled by waitForFlush):
///
-/// - waitForFlush=true (default): Grow() blocks if the previous FlushAsync hasn't completed.
-/// Bounds memory to ~2 chunks in flight.
-/// - waitForFlush=false: Grow() never blocks. Data accumulates in the PipeWriter's internal
-/// buffer and is sent with the next completed flush. Maximum serialization throughput.
+/// - waitForFlush=true (default): Grow() waits for the previous FlushAsync before
+/// starting a new chunk. Pro: maximum pipeline parallelism, guaranteed end-to-end zero-copy.
+/// Con: slow consumer propagates back as server-thread blocking (bounded by flushTimeout).
+/// - waitForFlush=false: Grow() is fire-and-forget per chunk; only blocks when committed
+/// bytes exceed ~60 KB (memory threshold — itself an adaptive backpressure).
+/// Pro: no per-chunk waits, safer with mixed consumer speeds.
+/// Con: under heavy backpressure may fall back to an owned buffer, losing zero-copy
+/// for that chunk.
///
///
+/// Timeout safety: every synchronous flush-await is bounded by flushTimeout
+/// (default when the type is used directly;
+/// passes 10 s from its options). A
+/// propagates to the caller, allowing the connection to abort instead of blocking forever.
+///
/// Maximum chunk data size: 65535 bytes (UINT16 max).
///
public struct AsyncPipeWriterOutput : IBinaryOutputBase
@@ -43,12 +51,19 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize;
private readonly bool _waitForFlush;
+ private readonly TimeSpan _flushTimeout;
private int _committedBytes;
private int _currentChunkStart;
private bool _ownedBuffer;
private ValueTask _lastFlush;
- public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true)
+ /// Creates an output bound to the given PipeWriter with self-describing chunked framing.
+ /// Target pipe (typically Kestrel's transport output for SignalR).
+ /// Per-chunk data size (max ). Default 4 KB matches Kestrel's slab size.
+ /// See class summary — pipeline parallelism (true) vs adaptive (false).
+ /// Per-flush timeout. null →
+ /// (wait forever — legacy behavior). Pass a positive value to fail fast on stuck consumers.
+ public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096, bool waitForFlush = true, TimeSpan? flushTimeout = null)
{
if (chunkSize > MaxChunkSize)
throw new ArgumentOutOfRangeException(nameof(chunkSize), chunkSize,
@@ -57,6 +72,9 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
_pipeWriter = pipeWriter;
_chunkSize = chunkSize;
_waitForFlush = waitForFlush;
+ // null → Timeout.InfiniteTimeSpan ("wait forever" — natively supported by Task.Wait as -1ms).
+ // A positive value enables bounded waiting; on timeout a TimeoutException propagates to the caller.
+ _flushTimeout = flushTimeout ?? System.Threading.Timeout.InfiniteTimeSpan;
_committedBytes = 0;
_ownedBuffer = false;
_lastFlush = default;
@@ -65,13 +83,24 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
///
/// Synchronously awaits a FlushAsync ValueTask.
/// Fast-path: if already completed, returns without Task allocation.
- /// Slow-path: converts to Task for proper blocking (backpressure).
+ /// Slow-path: blocks with — throws
+ /// if the flush does not complete within it (guards against slow/stuck/disconnected consumers).
+ /// means "wait forever" (natively supported by Task.Wait).
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static void SyncAwaitFlush(ValueTask vt)
+ private void SyncAwaitFlush(ValueTask vt)
{
- if (!vt.IsCompletedSuccessfully)
- vt.AsTask().GetAwaiter().GetResult();
+ if (vt.IsCompletedSuccessfully) return;
+
+ var task = vt.AsTask();
+
+ if (!task.Wait(_flushTimeout))
+ throw new TimeoutException(
+ $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " +
+ "consumer may be too slow, stuck, or disconnected.");
+
+ // Completed within timeout — propagate any faulted exception
+ task.GetAwaiter().GetResult();
}
///
diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
index e26b953..5202c34 100644
--- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
+++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
@@ -76,6 +76,17 @@ public class AcBinaryHubProtocol : IHubProtocol
protected readonly BinaryProtocolMode _protocolMode;
protected readonly ILogger? _logger;
+ ///
+ /// AsyncSegment per-chunk flush synchronization — see .
+ ///
+ protected readonly bool _waitForFlush;
+
+ ///
+ /// Per-flush wait limit — see .
+ /// Guaranteed positive or by .
+ ///
+ protected readonly TimeSpan _flushTimeout;
+
///
/// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
/// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server.
@@ -108,10 +119,38 @@ public class AcBinaryHubProtocol : IHubProtocol
public int ChunkFrameBytesConsumed;
}
- public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
+ ///
+ /// Parameterless constructor — creates the protocol with all-default options
+ /// (, 4 KB buffer, 10 s flush timeout, "acbinary" name).
+ /// Mainly for tests and simple scenarios. For production, pass an explicit
+ /// or configure via DI.
+ ///
+ public AcBinaryHubProtocol() : this(new AcBinaryHubProtocolOptions()) { }
+ ///
+ /// Legacy constructor — wraps the arguments into
+ /// and delegates to the options-based constructor. Kept for backward compatibility;
+ /// will be removed in a future version in favor of the options-based API.
+ ///
public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null)
+ : this(new AcBinaryHubProtocolOptions
+ {
+ SerializerOptions = options,
+ ProtocolMode = protocolMode,
+ Logger = logger,
+ BufferSize = 4096
+ // FlushTimeout, WaitForFlush, Name — use options defaults (30s, true, "acbinary")
+ })
+ { }
+
+ ///
+ /// Primary constructor. All configuration flows through .
+ ///
+ public AcBinaryHubProtocol(AcBinaryHubProtocolOptions options)
{
+ if (options is null) throw new ArgumentNullException(nameof(options));
+ options.Validate();
+
// Send-side guard: AsyncSegment uses AsyncPipeWriterOutput whose sync-over-async flush
// would block the browser's single UI thread. The receive side converts chunked wire
// to a synchronous deserialize on WASM automatically (see TryParseChunkData).
@@ -119,22 +158,26 @@ public class AcBinaryHubProtocol : IHubProtocol
// TEMP: commented out to test AsyncSegment on both Windows app and WASM without rebuild.
// Small WASM payloads work; larger ones may deadlock on sync-over-async FlushAsync.
// Restore once BinaryProtocolMode is runtime-configurable in Program.cs.
- //if (IsBrowser && protocolMode == BinaryProtocolMode.AsyncSegment)
+ //if (IsBrowser && options.ProtocolMode == BinaryProtocolMode.AsyncSegment)
// throw new PlatformNotSupportedException(
// "BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " +
// "Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead.");
- _options = options;
- _options.BufferWriterChunkSize = 4096;
- _protocolMode = protocolMode;
- _logger = logger;
+ _options = options.SerializerOptions;
+ _options.BufferWriterChunkSize = options.BufferSize;
+ _protocolMode = options.ProtocolMode;
+ _logger = options.Logger;
+ _waitForFlush = options.WaitForFlush;
+ _flushTimeout = options.FlushTimeout;
+ Name = options.Name;
_chunkStates = new ConditionalWeakTable();
if (_logger != null)
{
_logger.LogInformation(
- "AcBinaryHubProtocol initialized mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}",
- _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity,
+ "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} waitForFlush={WaitForFlush} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}",
+ Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity,
+ _waitForFlush, _flushTimeout.TotalMilliseconds,
_options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression);
}
}
@@ -149,18 +192,38 @@ public class AcBinaryHubProtocol : IHubProtocol
set => _options = value;
}
- public string Name => "acbinary";
+ /// Protocol name sent in SignalR handshake. Set via . Default: "acbinary".
+ public string Name { get; } = "acbinary";
+
public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary;
///
/// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask.
/// Fast-path: if already completed (no backpressure), returns directly without Task allocation.
- /// Slow-path: converts to Task for proper blocking when pipe backpressure is active.
+ /// Slow-path: blocks with — throws
+ /// if the flush does not complete within the timeout (protects against slow/stuck/disconnected
+ /// consumers holding the server thread indefinitely).
+ ///
+ /// guarantees _flushTimeout is either
+ /// positive or (which Task.Wait
+ /// natively treats as "wait forever"), so no explicit zero-check is needed here.
+ ///
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static FlushResult SyncFlush(ValueTask vt)
- => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
+ private FlushResult SyncFlush(ValueTask vt)
+ {
+ if (vt.IsCompletedSuccessfully) return vt.Result;
+
+ var task = vt.AsTask();
+
+ if (!task.Wait(_flushTimeout))
+ throw new TimeoutException(
+ $"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " +
+ "consumer may be too slow, stuck, or disconnected.");
+
+ return task.GetAwaiter().GetResult();
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
@@ -473,7 +536,7 @@ public class AcBinaryHubProtocol : IHubProtocol
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null)
{
- dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
+ dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _waitForFlush, _flushTimeout);
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
}
diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs
new file mode 100644
index 0000000..6a404cb
--- /dev/null
+++ b/AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs
@@ -0,0 +1,113 @@
+using AyCode.Core.Serializers.Binaries;
+using Microsoft.Extensions.Logging;
+
+namespace AyCode.Services.SignalRs;
+
+///
+/// Configuration for and derived protocols.
+/// Use via services.Configure<AcBinaryHubProtocolOptions>(...) in Program.cs,
+/// or pass an instance directly to the protocol constructor.
+///
+public sealed class AcBinaryHubProtocolOptions
+{
+ // --- Serializer (standalone sub-group, also usable without SignalR via ToBinary/BinaryTo) ---
+
+ /// Binary serializer options. Default: .
+ public AcBinarySerializerOptions SerializerOptions { get; set; } = AcBinarySerializerOptions.Default;
+
+ // --- Transport ---
+
+ /// Wire format and pipeline strategy. Default: .
+ public BinaryProtocolMode ProtocolMode { get; set; } = BinaryProtocolMode.Bytes;
+
+ /// Chunk size for BufferWriterBinaryOutput / AsyncPipeWriterOutput.
+ /// Default: 4096 (aligns with Kestrel's slab size for optimal latency-to-first-byte).
+ public int BufferSize { get; set; } = 4096;
+
+ ///
+ /// Per-chunk flush synchronization in mode.
+ ///
+ /// - true (default): every Grow waits for the previous FlushAsync to complete
+ /// before starting the next chunk. Guarantees end-to-end zero-copy (no owned-buffer fallback)
+ /// and maximum pipeline parallelism. Best for high-throughput servers with fast consumers.
+ /// - false: fire-and-forget flush per chunk; blocks only when committed bytes exceed
+ /// the memory threshold (~60 KB). Itself an adaptive backpressure mode — a fast consumer
+ /// never triggers a wait, a slow consumer naturally throttles through buffer pressure.
+ /// Safer for mixed consumer speeds and memory-sensitive environments.
+ ///
+ /// Ignored for Bytes and Segment modes.
+ ///
+ public bool WaitForFlush { get; set; } = true;
+
+ ///
+ /// Maximum wait for a single synchronous FlushAsync before throwing
+ /// . Protects against slow/stuck/disconnected consumers
+ /// blocking the server thread indefinitely.
+ ///
+ /// Default: 10 seconds. Rationale: AsyncSegment chunks are max 65 KB (UINT16 size field);
+ /// even GPRS-class connections (~60 Kbit/s) transfer 65 KB in ~9 s. If a flush exceeds 10 s,
+ /// the consumer is effectively stuck — faster failure detection is preferable to indefinite blocking.
+ ///
+ ///
+ /// Note: complementary to SignalR's connection-level timeouts (ClientTimeoutInterval,
+ /// KeepAliveInterval). This is a per-operation guard; SignalR timeouts manage the overall
+ /// connection lifetime. Set FlushTimeout < ClientTimeoutInterval so this guard fires first.
+ ///
+ /// Set to to disable.
+ ///
+ public TimeSpan FlushTimeout { get; set; } = TimeSpan.FromSeconds(10);
+
+ // --- Identity ---
+
+ /// Protocol name in the SignalR handshake. Client and server must match. Default: "acbinary".
+ public string Name { get; set; } = "acbinary";
+
+ // --- Diagnostics ---
+
+ /// Optional logger. When null, no protocol-internal logging occurs.
+ public ILogger? Logger { get; set; }
+
+ // --- Validation ---
+
+ ///
+ /// Validates option values and platform compatibility.
+ /// Throws / on invalid values,
+ /// or for unsupported platform/mode combinations.
+ ///
+ public void Validate()
+ {
+ // NOTE: WASM + AsyncSegment send-path guard is currently commented out in the protocol
+ // constructor for testing. Once BinaryProtocolMode becomes runtime-configurable in
+ // Program.cs, this validation will be re-enabled here as the primary guard.
+ //if (OperatingSystem.IsBrowser() && ProtocolMode == BinaryProtocolMode.AsyncSegment)
+ // throw new PlatformNotSupportedException(
+ // "BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " +
+ // "Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead.");
+
+ if (BufferSize < 256 || BufferSize > AsyncPipeWriterOutput.MaxChunkSize)
+ throw new ArgumentOutOfRangeException(nameof(BufferSize), BufferSize,
+ $"BufferSize must be between 256 and {AsyncPipeWriterOutput.MaxChunkSize} (UINT16 max).");
+
+ if (FlushTimeout <= TimeSpan.Zero && FlushTimeout != System.Threading.Timeout.InfiniteTimeSpan)
+ throw new ArgumentOutOfRangeException(nameof(FlushTimeout), FlushTimeout,
+ "FlushTimeout must be positive, or Timeout.InfiniteTimeSpan to disable.");
+
+ if (string.IsNullOrWhiteSpace(Name))
+ throw new ArgumentException("Name cannot be null or whitespace.", nameof(Name));
+ }
+
+ ///
+ /// Creates a shallow copy. Required for DI IOptions<T> integration —
+ /// the singleton from DI must not be mutated by per-connection configure callbacks.
+ ///
+ public AcBinaryHubProtocolOptions Clone() => new()
+ {
+ SerializerOptions = SerializerOptions,
+ ProtocolMode = ProtocolMode,
+ BufferSize = BufferSize,
+ WaitForFlush = WaitForFlush,
+ FlushTimeout = FlushTimeout,
+ Name = Name,
+ Logger = Logger
+ };
+}
diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
index b34b495..aaeb2ac 100644
--- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
+++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
@@ -24,8 +24,22 @@ namespace AyCode.Services.SignalRs;
///
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
{
- public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
- public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
+ ///
+ /// Parameterless constructor — creates the protocol with all-default options. See base class.
+ ///
+ public AyCodeBinaryHubProtocol() : base() { }
+
+ ///
+ /// Legacy constructor — delegates to the base legacy constructor, which wraps into
+ /// . Kept for backward compatibility.
+ ///
+ public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null)
+ : base(options, protocolMode, logger) { }
+
+ ///
+ /// Primary constructor — accepts a fully-configured .
+ ///
+ public AyCodeBinaryHubProtocol(AcBinaryHubProtocolOptions options) : base(options) { }
#region Wire header (per-message)
diff --git a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md
index ebec83d..b740daa 100644
--- a/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md
+++ b/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL.md
@@ -150,25 +150,48 @@ Typical overhead for 225KB payload with 4096-byte segments: ~224.5KB zero-copy,
> Known issues: `AyCode.Core/docs/BINARY_ISSUES.md`
-## Config
+## Configuration
-| Property | Default | SignalR | Purpose |
-|----------|---------|---------|---------|
-| `Options` | `AcBinarySerializerOptions.Default` | — | Serializer options (volatile, runtime-replaceable) |
-| `BufferWriterChunkSize` | 65536 | 4096 | Chunk size for BWO. SignalR sets 4096 in `AcBinaryHubProtocol` constructor. |
-| `InitialBufferCapacity` | 16384 | — | Starting buffer for `ArrayBinaryOutput` (byte[] serialize path) |
+Hub protocol settings are controlled via **`AcBinaryHubProtocolOptions`** (mutable class). Pass directly to the protocol constructor, or configure via DI in `Program.cs` with `services.Configure(opts => …)`.
+
+| Property | Default | Purpose |
+|----------|---------|---------|
+| `SerializerOptions` | `AcBinarySerializerOptions.Default` | Binary serializer options (also usable standalone via `ToBinary`/`BinaryTo`). |
+| `ProtocolMode` | `Bytes` | Wire format and pipeline strategy — see **BinaryProtocolMode** below. |
+| `BufferSize` | 4096 | Per-chunk size. 4 KB aligns with Kestrel's slab. Max 65535 (UINT16). |
+| `WaitForFlush` | `true` | AsyncSegment flush strategy — see trade-off below. |
+| `FlushTimeout` | 10 s | Per-flush wait limit. `Timeout.InfiniteTimeSpan` = disabled. |
+| `Name` | `"acbinary"` | SignalR handshake protocol name. Client and server must match. |
+| `Logger` | `null` | Optional `ILogger`; injected from DI when registered. |
+
+Inner `AcBinarySerializerOptions` defaults relevant for SignalR: `UseGeneratedCode=true` (hybrid source-gen + reflection), `UseStringInterning=All`, `InitialBufferCapacity=16384`.
+
+### `WaitForFlush` (AsyncSegment-only)
+
+| Value | Pro | Con |
+|-------|-----|-----|
+| **`true`** (default) | Max pipeline parallelism + guaranteed end-to-end zero-copy on send. | Slow consumer propagates back as server-thread blocking (bounded by `FlushTimeout`). |
+| **`false`** | Adaptive — fire-and-forget per chunk, blocks only when ~60 KB memory threshold is hit. | Under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk. |
+
+### `FlushTimeout` rationale (10 s default)
+
+- AsyncSegment chunks are ≤ 65 KB (UINT16). Even GPRS-class links (~60 Kbit/s) transfer 65 KB in ~9 s — so any flush exceeding 10 s indicates a genuinely stuck consumer.
+- **Pro**: fast failure detection; server thread never blocks indefinitely.
+- **Con**: an unusually slow but otherwise healthy consumer will be disconnected — tune up for satellite / throttled links.
+- Complementary to SignalR's connection-level timeouts (`ClientTimeoutInterval`, `KeepAliveInterval`). Set `FlushTimeout < ClientTimeoutInterval` so this per-operation guard fires first.
+- Set to `Timeout.InfiniteTimeSpan` to fully disable (legacy behavior).
## BinaryProtocolMode
`enum BinaryProtocolMode` — constructor parameter for `AcBinaryHubProtocol`, selects serialization + transport strategy:
-| Value | Serialize | Deserialize | Characteristics |
-|-------|-----------|-------------|-----------------|
-| `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `SequenceReader.ToArray()` → `ArrayBinaryInput` (single contiguous buffer, `TryAdvanceSegment` → false, JIT-eliminated) | Fastest individual ser/deser. No zerocopy. No pipeline overlap. |
-| `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | `SequenceBinaryInput` → multi-segment `ReadOnlySequence` (lazy `TryGet` iteration, cross-boundary scratch) | Zerocopy write. No pipeline overlap. |
-| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing via `PipeWriter`, per-chunk `FlushAsync().Forget()` | `PipeReaderBinaryInput` from internal `Pipe` → background `Task` deser, on-demand `ReadAsync` | Zerocopy write + pipeline parallelism (ser/network/deser overlap). Max chunk: 65535 bytes (UINT16). |
+| Value | Serialize | Deserialize (non-WASM) | Pro / Con |
+|-------|-----------|----------------------|-----------|
+| `Bytes` (default) | `ArrayBinaryOutput` → `byte[]` → write to pipe as raw blob | `ArrayBinaryInput` (single contiguous buffer via `MemoryMarshal.TryGetArray` zero-copy / pool-rent). | **Pro**: simplest, fastest per-call, WASM-safe on both sides. **Con**: no zero-copy write, no pipeline overlap. |
+| `Segment` | `BufferWriterBinaryOutput` → directly to `PipeWriter`, chunk-by-chunk, single `Flush` at end | Same as Bytes (unified `ArrayBinaryInput` receive path — `_protocolMode` affects send only). | **Pro**: zero-copy write, WASM-safe. **Con**: no pipeline overlap — receiver must wait for full payload before deser starts. |
+| `AsyncSegment` | `AsyncPipeWriterOutput` → self-describing chunked framing `[201][UINT16 size][data]` per chunk, per-chunk `FlushAsync` with timeout-bounded sync-await | `SegmentBufferReader` (growing contiguous byte[]) + `SegmentBufferReaderInput`; background `Task.Run` deserializes while chunks arrive. WASM: synchronous deser on `CHUNK_END`. | **Pro**: zero-copy write + pipeline parallelism (ser / network / deser overlap). **Con**: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by `FlushTimeout`). Max chunk: 65535 bytes. |
-In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (written by `AsyncPipeWriterOutput` with 3-byte header reservation, zero-copy), (3) CHUNK_END — `[202]` (1 byte, no data). The receiver's `TryParseMessage` enters chunk accumulation mode after CHUNK_START, feeding data to an internal `Pipe` where a background `Task` deserializes via `PipeReaderBinaryInput`.
+In `AsyncSegment` mode, `WriteMessage` dispatches to `WriteMessageChunked` which sends: (1) CHUNK_START — standard SignalR framing `[INT32 len][200][original message with INT32 -1 for streamed arg]`, (2) N x CHUNK_DATA — `[201][UINT16 size][data]` per chunk (zero-copy via `PipeWriter.Advance` with 3-byte header reservation), (3) CHUNK_END — `[202]` (1 byte). The receiver's `TryParseChunkData` accumulates into a `SegmentBufferReader`; on non-WASM platforms a background `Task.Run` deserializes in parallel via `SegmentBufferReaderInput`, on WASM the deserializer runs synchronously on `CHUNK_END` over the already-buffered data.
In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used.
@@ -176,7 +199,7 @@ In `Bytes` and `Segment` mode, the standard `WriteMessage` path is used.
The send and receive paths handle WASM (`OperatingSystem.IsBrowser()`) asymmetrically — **send** is strictly bound to `_protocolMode`, **receive** adapts to the wire format and falls back to a synchronous path only when the platform cannot support the optimal strategy.
-- **Send path**: `AsyncSegment` is **not supported on WebAssembly**. The constructor throws `PlatformNotSupportedException` if `IsBrowser && protocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`.
+- **Send path**: `AsyncSegment` is **not supported on WebAssembly**. The `AcBinaryHubProtocolOptions.Validate()` method throws `PlatformNotSupportedException` if `IsBrowser && ProtocolMode == AsyncSegment` (the `AsyncPipeWriterOutput.SyncAwaitFlush` sync-over-async pattern would block the single UI thread). WASM clients must use `Bytes` or `Segment`. *(Note: this guard is currently commented out in `Validate()` to enable hybrid Windows-app + WASM testing against a single protocol instance. Will be re-enabled once the options are fully wired through `Program.cs`.)*
- **Receive path**: works on WASM with **any** server-side mode (including `AsyncSegment` → chunked wire). `TryParseChunkData` detects the platform at runtime:
- **Non-browser**: first `CHUNK_DATA` spawns a background `Task.Run` over a `SegmentBufferReader` (pipeline parallelism — serialize / network / deserialize overlap). `CHUNK_END` awaits the task's result.
- **Browser**: the background task is skipped. Chunks accumulate in `SegmentBufferReader`; on `CHUNK_END` the buffer is `Complete()`d and the deserializer runs synchronously on the current thread. `SegmentBufferReaderInput.TryAdvanceSegment` sees `_completed=true` and never calls `ManualResetEventSlim.Wait()` (which throws `PlatformNotSupportedException` on WASM).