AyCode.Core/AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL
Loretta 97ac3e21a3 [LOADED_DOCS: 3 files, no new loads]
Remove SegmentBufferReader; unify on AsyncPipeReaderInput

Migrates all SignalR chunked streaming receive logic to AsyncPipeReaderInput, fully removing SegmentBufferReader and SegmentBufferReaderInput from the codebase. Updates all references, deserialization paths, and documentation to reflect the new unified primitive. Marks ADR-0003 as accepted (partially executed), closes related TODOs, and clarifies protocol docs. Sets DoubleBuffered as the default FlushPolicy. No wire format or behavioral changes; all tests pass.
2026-05-03 15:21:15 +02:00
..
README.md [LOADED_DOCS: 3 files, no new loads] 2026-05-03 15:21:15 +02:00
SIGNALR_BINARY_PROTOCOL_ISSUES.md [LOADED_DOCS: 3 files, no new loads] 2026-05-03 15:21:15 +02:00
SIGNALR_BINARY_PROTOCOL_TODO.md [LOADED_DOCS: 3 files, no new loads] 2026-05-03 15:21:15 +02:00

README.md

SignalR Binary Protocol

AcBinaryHubProtocol (unsealed base) — custom IHubProtocol (name: "acbinary") replacing SignalR JSON+Base64 with AcBinarySerializer. General binary framing only — no consumer-specific logic.

AyCodeBinaryHubProtocol (derived) — project-specific consumer logic: SignalParams capture via OnArgumentRead, IsRawBytesData path, SignalDataType type resolution in ReadSingleArgument override.

Architecture (tag system, dispatch, request/response): SIGNALR.md Output writers (cached chunk, buffer states, chunk sizing): AyCode.Core/AyCode.Core/docs/BINARY/BINARY_WRITERS.md

Wire Format

[INT32 LE payload length] [1 byte message type] [message-specific fields]
Type Byte Fields
Invocation 1 nullable invocationId, target, arguments, streamIds, headers
StreamItem 2 invocationId, argument, headers
Completion 3 invocationId, nullable error, hasResult, optional result, headers
StreamInvocation 4 invocationId, target, arguments, streamIds, headers
CancelInvocation 5 invocationId, headers
Ping 6 (empty)
Close 7 nullable error, allowReconnect
Ack 8 sequenceId (INT64)
Sequence 9 sequenceId (INT64)

Argument framing: [INT32 LE arg length] [serialized bytes] — deferred deserialization (target parsed first → IInvocationBinder resolves types).

Strings: [VarUInt byte length] [UTF-8]. Nullable: 0=null, 1=value follows.

Headers: [VarUInt count] [key-value pairs...]. Count=0 → no headers.

Zero-Copy Write Pipeline

Writes directly to SignalR pipe's IBufferWriter<byte>, no intermediate byte[].

WriteMessage(HubMessage, IBufferWriter<byte> output)
├─ Reserve 4-byte outer length prefix
├─ BWO = BufferWriterBinaryOutput(output)  [standalone mode]
│  ├─ WriteByte(messageType)
│  ├─ WriteStringUtf8(invocationId, target)
│  ├─ WriteVarUInt(argCount)
│  ├─ Per argument:
│  │  ├─ byte[] (AcBinary) → raw length + bytes (no 0x44 wrapper — first byte is AcBinary FormatVersion 0x01, no VarUInt)
│  │  ├─ byte[] (other)    → tag (0x44) + raw bytes (no VarUInt — argLength implies size)
│  │  └─ object → FlushAndReset() → reserve INT32 arg prefix
│  │     → AcBinarySerializer.Serialize(value, output) → patch prefix
│  ├─ WriteStringArray(streamIds)
│  └─ WriteHeaders(headers)
├─ Patch outer length = BWO.Position + externalBytes
└─ BWO.Flush()

byte[] Write: isAcBinary Detection

When argument is byte[], the protocol checks if it's already AcBinary-serialized data:

var isAcBinary = byteArray.Length >= 2
    && byteArray[0] == AcBinarySerializerOptions.FormatVersion  // 1
    && (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase; // 0x90
  • isAcBinary = true: [argLength=N][raw AcBinary bytes] — no 0x44 wrapper tag; first byte is the AcBinary FormatVersion 0x01 (the discriminator the receiver uses), reader deserializes directly
  • isAcBinary = false: [argLength=1+N][0x44 tag][raw bytes] — tag for type detection, no VarUInt (argLength implies size)

Dual BWO Pattern

Protocol and serializer each create own BufferWriterBinaryOutput on the same IBufferWriter. Sequential, never concurrent:

  1. Protocol BWO (standalone): framing — message type, strings, headers. Cached chunk, zero dispatch.
  2. FlushAndReset(): commits bytes to pipe, invalidates chunk.
  3. Serializer BWO (context mode): Serialize() creates internal BWO, acquires fresh chunk, writes, flushes.
  4. Protocol BWO re-acquires chunk on next write (via Grow).

Cost: one extra GetMemory per argument (nanoseconds). Benefit: zero-copy end-to-end, no intermediate byte[], no wrapper class.

Why two BWOs: serializer writes must live on BinarySerializationContext (sealed class) for JIT optimization — context owns its own BWO. See AyCode.Core/AyCode.Core/docs/BINARY/BINARY_WRITERS.md § "Why Writes Are on the Context".

Length Prefix Patching

var lengthSpan = output.GetSpan(4);
output.Advance(4);
// ... write payload ...
Unsafe.WriteUnaligned(ref lengthSpan[0], payloadLength);

Safe for PipeWriter — segments writable until FlushAsync.

GetMessageBytes caveat: ArrayBufferWriter initial capacity must include LengthPrefixSize to prevent resize after prefix reservation (stale span).

Read: Argument Deserialization

Base AcBinaryHubProtocol.ReadSingleArgument reads [INT32 argLength] [argBytes] from the pipe's ReadOnlySequence via SequenceReader<byte>:

ReadSingleArgument(SequenceReader, targetType):
  Read INT32 argLength
  if argLength == 0 → return null
  if argLength == 1 && first byte == 0 → return null  (null marker)

  argSlice = UnreadSequence.Slice(0, argLength)  — zero-copy reference
  Advance(argLength)

  1. byte[] fast-path:
     if first byte == BinaryTypeCode.ByteArray (0x44):
       return argSlice.Slice(1) as byte[]  — skip tag, rest is raw payload
       No VarUInt — argLength implies size

  2. Default: DeserializeFromSequence(argSlice, targetType, options)

AyCodeBinaryHubProtocol.ReadSingleArgument overrides with consumer-specific paths:

  (same argLength/null/slice logic as base)

  1. byte[] fast-path (same as base)

  2. IsRawBytesData path:
     if _currentSignalParams.IsRawBytesData == true:
       return SequenceToByteArray(argSlice)  — entire arg as raw byte[]
       Consumer (DataSource.PopulateMerge) handles deserialization

  3. Typed deserialization:
     if targetType == object && SignalDataType != null:
       resolve Type from SignalDataType (AssemblyQualifiedName)
     DeserializeFromSequence(argSlice, resolvedType, options)

SignalParams Capture

Base AcBinaryHubProtocol.ReadArguments calls OnArgumentRead(value, index) after each argument. AyCodeBinaryHubProtocol overrides this hook to capture SignalParams (arg[2]) for type-aware deserialization of subsequent args (arg[3] = data). Thread-safe: SignalR processes messages sequentially per connection.

SequenceToByteArray

Zero-copy when possible: if single-segment and backing array matches exactly → return the array directly. Otherwise ReadOnlySequence.ToArray().

SequenceBinaryInput (Multi-Segment Deserialization)

struct SequenceBinaryInput : IBinaryInputBase — reads ReadOnlySequence<byte> without linearizing. Lazy iteration via ReadOnlySequence.TryGet — zero ctor alloc, no pre-extracted segment array.

Context _buffer points directly to current segment's backing byte[] (zero-copy). Cross-boundary reads (value straddling segments) copy only affected bytes into ArrayPool-rented scratch. After scratch read, _afterCrossBoundary flag restores context to next segment's backing array.

Typical overhead — 225KB payload, 4096-byte segments: ~224.5KB zero-copy + ~500 bytes scratch at ~55 boundaries. Scratch is rented once (lazy, first boundary) and reused; Release() returns it to ArrayPool.

Known issues: AyCode.Core/AyCode.Core/docs/BINARY/BINARY_ISSUES.md

Configuration

Hub protocol settings via AcBinaryHubProtocolOptions (mutable class). Pass to ctor directly, or configure via DI in Program.cs (services.Configure<AcBinaryHubProtocolOptions>(opts => …)).

Property Default Purpose
SerializerOptions AcBinarySerializerOptions.Default Binary serializer options (also usable standalone via ToBinary/BinaryTo).
ProtocolMode Bytes Wire format and pipeline strategy — see BinaryProtocolMode below.
BufferSize 4096 Per-chunk size. 4 KB aligns with Kestrel's slab. Max 65535 (UINT16).
FlushPolicy Coalesced AsyncSegment flush strategy — see trade-off below.
FlushTimeout 10 s Per-flush wait limit. Timeout.InfiniteTimeSpan = disabled.
Name "acbinary" SignalR handshake protocol name. Client and server must match.
Logger null Optional ILogger; injected from DI when registered.

Inner AcBinarySerializerOptions defaults relevant for SignalR: UseGeneratedCode=true (hybrid source-gen + reflection), UseStringInterning=All, InitialBufferCapacity=16384.

FlushPolicy (AsyncSegment-only)

Value Peak memory Pro Con
FlushPolicy.PerChunk ~chunk × 1 Strictly bounded peak memory regardless of consumer speed; guaranteed end-to-end zero-copy. No producer/flush parallelism — wall-clock = sum of (serialize + flush) per chunk.
FlushPolicy.DoubleBuffered ~chunk × 2 Maximum producer/flush parallelism with bounded memory — wall-clock = max(serialize, flush) × N_chunks. Recommended for balanced streaming. Slow consumer propagates back as server-thread blocking at next Grow (bounded by FlushTimeout).
FlushPolicy.Coalesced (default) ~64 KB per window Batches chunks into ~64 KB flush windows: while a flush is in-flight, new chunks accumulate; when the window fills, one batched FlushAsync covers the whole window. Major throughput win — a 9.5 MB payload at 4 KB chunks fires ~150 flushes (one per window) instead of ~2 300 per-chunk flushes. Especially impactful on transports with non-trivial FlushAsync overhead (network sockets, Kestrel WebSocket, kernel TCP). Per-window peak memory ~64 KB; under heavy backpressure a chunk may fall back to an owned (copied) buffer, losing zero-copy for that chunk.

FlushTimeout rationale (10 s default)

  • AsyncSegment chunks are ≤ 65 KB (UINT16). Even GPRS-class links (~60 Kbit/s) transfer 65 KB in ~9 s — so any flush exceeding 10 s indicates a genuinely stuck consumer.
  • Pro: fast failure detection; server thread never blocks indefinitely.
  • Con: an unusually slow but otherwise healthy consumer will be disconnected — tune up for satellite / throttled links.
  • Complementary to SignalR's connection-level timeouts (ClientTimeoutInterval, KeepAliveInterval). Set FlushTimeout < ClientTimeoutInterval so this per-operation guard fires first.
  • Set to Timeout.InfiniteTimeSpan to fully disable (legacy behavior).

BinaryProtocolMode

enum BinaryProtocolMode — constructor parameter for AcBinaryHubProtocol, selects serialization + transport strategy:

Value Serialize Deserialize (non-WASM) Pro / Con
Bytes (default) ArrayBinaryOutputbyte[] → write to pipe as raw blob ArrayBinaryInput (single contiguous buffer via MemoryMarshal.TryGetArray zero-copy / pool-rent). Pro: simplest, fastest per-call, WASM-safe on both sides. Con: no zero-copy write, no pipeline overlap.
Segment BufferWriterBinaryOutput → directly to PipeWriter, chunk-by-chunk, single Flush at end Same as Bytes (unified ArrayBinaryInput receive path — _protocolMode affects send only). Pro: zero-copy write, WASM-safe. Con: no pipeline overlap — receiver must wait for full payload before deser starts.
AsyncSegment AsyncPipeWriterOutput → self-describing chunked framing [201][UINT16 size][data] per chunk + [202] end marker, per-chunk FlushAsync with timeout-bounded sync-await AsyncPipeReaderInput (growing contiguous byte[] with sliding-window cycle); background Task.Run deserializes while chunks arrive. WASM: synchronous deser on CHUNK_END. Pro: zero-copy write + pipeline parallelism (ser / network / deser overlap). Con: send-side not WASM-compatible (see below); slow consumer propagates as server-thread blocking (bounded by FlushTimeout). Max chunk: 65535 bytes.

In AsyncSegment mode, WriteMessage dispatches to WriteMessageChunked which sends: (1) CHUNK_START — standard SignalR framing [INT32 len][200][original message with INT32 -1 for streamed arg], (2) N x CHUNK_DATA + final CHUNK_END — [201][UINT16 size][data] per chunk + [202] end marker, all emitted by AsyncPipeWriterOutput in framed mode (zero-copy via PipeWriter.Advance with 3-byte header reservation; protocol layer no longer writes its own [202] or extra FlushAsync). The receiver's TryParseChunkData accumulates into an AsyncPipeReaderInput (multiMessage:false — protocol parses [201]/[202] framing externally and feeds raw data via Feed); on non-WASM platforms a background Task.Run deserializes in parallel via AsyncPipeReaderInputAdapter, on WASM the deserializer runs synchronously on CHUNK_END over the already-buffered data.

In Bytes and Segment mode, the standard WriteMessage path is used.

WebAssembly compatibility

Send and receive paths handle WASM (OperatingSystem.IsBrowser()) asymmetrically — send strictly bound to _protocolMode; receive adapts to wire format, falls back synchronously when platform can't support the optimal strategy.

  • Send path: AsyncSegment is not supported on WebAssembly. AcBinaryHubProtocolOptions.Validate() throws PlatformNotSupportedException if IsBrowser && ProtocolMode == AsyncSegment (the AsyncPipeWriterOutput.SyncAwaitFlush sync-over-async pattern would block the single UI thread). WASM clients must use Bytes or Segment.
  • Receive path: works on WASM with any server-side mode (including AsyncSegment → chunked wire). TryParseChunkData detects the platform at runtime:
    • Non-browser: first CHUNK_DATA spawns a background Task.Run over an AsyncPipeReaderInput (pipeline parallelism — serialize / network / deserialize overlap). CHUNK_END awaits the task's result.
    • Browser: the background task is skipped. Chunks accumulate in AsyncPipeReaderInput; on CHUNK_END the buffer is Complete()d and the deserializer runs synchronously on the current thread via the streaming overload AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput, Type, opts). After Complete(), TryAdvanceSegment sees _completed=true and never calls ManualResetEventSlim.Wait() (which throws PlatformNotSupportedException on WASM).

Consequence: mixed topology (desktop server AsyncSegment + WASM client Bytes) works without negotiation or protocol-name variation — client converts incoming chunked wire to its synchronous processing model.

Registration in Program.cs

Server

builder.Services.AddSignalR(hubOptions =>
    {
        hubOptions.EnableDetailedErrors = true;
        hubOptions.MaximumReceiveMessageSize = 30_000_000;
        hubOptions.KeepAliveInterval = TimeSpan.FromSeconds(60);
        hubOptions.ClientTimeoutInterval = TimeSpan.FromSeconds(180);
        hubOptions.StatefulReconnectBufferSize = 30_000_000;
    })
    .AddAcBinaryProtocol(opts =>
    {
        opts.ProtocolMode = BinaryProtocolMode.AsyncSegment;
        // opts.FlushTimeout = TimeSpan.FromSeconds(10);  // default
    });

Client — HubConnectionBuilder as a DI transient

The consumer (e.g. a class derived from AcSignalRClientBase) receives the builder via DI:

services.AddTransient<HubConnectionBuilder>(sp =>
{
    var logger = sp.GetRequiredService<MyLoggerClient>();
    var hubUrl = $"{Config.BaseUrl}/{Config.HubName}";

    var builder = new HubConnectionBuilder()
        .WithUrl(hubUrl, HttpTransportType.WebSockets, options =>
        {
            options.TransportMaxBufferSize = 30_000_000;
            options.ApplicationMaxBufferSize = 30_000_000;
            options.CloseTimeout = TimeSpan.FromSeconds(10);
            options.SkipNegotiation = true;
        })
        .ConfigureLogging(logging =>
        {
            logging.SetMinimumLevel(LogLevel.Information);
            logging.AddAcLogger(_ => logger);
        })
        .WithAutomaticReconnect()
        .WithStatefulReconnect()
        .WithKeepAliveInterval(TimeSpan.FromSeconds(60))
        .WithServerTimeout(TimeSpan.FromSeconds(180));

    builder.AddAcBinaryProtocol(opts =>
    {
        // Desktop / server / native: AsyncSegment for pipeline parallelism.
        // WebAssembly: must be Bytes or Segment (Validate throws on AsyncSegment).
        opts.ProtocolMode = OperatingSystem.IsBrowser()
            ? BinaryProtocolMode.Segment
            : BinaryProtocolMode.AsyncSegment;
    });

    return builder;
});

services.AddSingleton<MySignalRClient>();   // derived from AcSignalRClientBase

Note: AcSignalRClientBase is HubConnectionBuilder-injected and calls only Build() + dispatch wiring internally. All transport/protocol configuration lives in Program.cs — visible, overridable per environment, and identical on both ends of the wire.

  • adr/0001-acbinary-decorator-feature-stack-design.mdAcBinaryHubProtocol optional feature stack — decorator-based composition design (Status: Proposed). Umbrella ADR for optional decorator-based feature stack (encryption, compression with MinSize, OpenTelemetry tracing, HMAC signing). NuGet-competitiveness TODO entries (ACCORE-SBP-T-H7M5 / N9F3 / J5W8 / B3K6 in SIGNALR_BINARY_PROTOCOL_TODO.md) resolve under this umbrella. Leaf ADRs (0002-0005) for per-feature design + threat model.
  • AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.mdAcBinary streaming receive — AsyncPipeReaderInput unified primitive (Status: Accepted (2026-05-03), partially executed — Steps 4 & 5 NamedPipe / FileStream helpers dropped). Repo-level cross-cutting ADR. The receive-side migration delivered: SegmentBufferReader + SegmentBufferReaderInput consolidated into a single AsyncPipeReaderInput class; SignalR's TryParseChunkData migrated (ACCORE-SBP-T-G7T2). The unified AsyncSegment chunked wire format ([INT32 length][200 CHUNK_START][201][UINT16 size][data][202 CHUNK_END]) documented in this README's "Wire Format" / "BinaryProtocolMode" sections is preserved verbatim. Transport-agnostic helpers (NamedPipe / FileStream wrappers — Steps 4 & 5) were dropped during execution: the framework stays consumer-implements-transport (only PipeWriter / PipeReader primitives exposed); see ADR-0003's "Status" section for the rationale.

Source: AyCode.Services/SignalRs/AcBinaryHubProtocol.cs (base), AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs (consumer logic), AyCode.Services/SignalRs/BinaryProtocolMode.cs (enum), AyCode.Services/SignalRs/AcBinaryHubProtocolOptions.cs (options), AyCode.Services/SignalRs/AcSignalRProtocolExtensions.cs (DI extensions)