using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text; using AyCode.Core.Extensions; using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs; /// /// Custom SignalR hub protocol using AcBinarySerializer for wire format. /// Eliminates JSON+Base64 overhead by serializing all HubMessages directly to binary. /// /// Wire format per message: /// [4 bytes: payload length (little-endian)] [payload bytes] /// /// Payload structure: /// [1 byte: message type] [message-specific fields serialized via AcBinary] /// /// Message types map 1:1 to SignalR HubMessageType values. /// Arguments are serialized individually with an INT32 length prefix each, /// enabling deferred deserialization via IHubProtocol's binder pattern. /// /// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path. /// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write). /// /// Read path: SequenceReader<byte> reads directly from the pipe's ReadOnlySequence. /// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read). /// public class AcBinaryHubProtocol : IHubProtocol { private const int LengthPrefixSize = 4; // Message type markers (matching HubMessageType enum values) private const byte MsgInvocation = 1; private const byte MsgStreamItem = 2; private const byte MsgCompletion = 3; private const byte MsgStreamInvocation = 4; private const byte MsgCancelInvocation = 5; private const byte MsgPing = 6; private const byte MsgClose = 7; private const byte MsgAck = 8; private const byte MsgSequence = 9; // Chunked protocol framing for AsyncSegment mode private const byte MsgAsyncChunkStart = 200; private const byte MsgAsyncChunkData = 201; private const byte MsgAsyncChunkEnd = 202; /// /// CHUNK_ABORT marker. Emitted by the sender's if the /// streamed-arg serialize fails mid-stream after CHUNK_START has been sent — instead of letting /// the exception propagate (which would abort the entire SignalR transport connection in /// HubConnectionContext.WriteCore, killing all other in-flight invocations on the same /// WebSocket), the sender writes a single [203] byte so the receiver can fault the /// pending invocation cleanly while the transport stays alive (fault isolation: /// blast radius = one message). Recognised by , which surfaces /// the abort to the awaiting caller via a synthesised . /// private const byte MsgAsyncChunkAbort = 203; /// Sentinel object placed in the args array for the streamed argument (replaced after chunk deserialization). protected static readonly object StreamedArgPlaceholder = new(); /// /// True when running on a browser (WebAssembly) runtime. Cached at type-load because /// the value is invariant per process and the check is used on hot paths. /// /// Browser implications: /// /// Send path: AsyncSegment is unsupported (sync-over-async flush blocks the single UI thread). /// Receive path: when chunked wire arrives, background Task.Run is skipped; /// the deserializer runs synchronously on CHUNK_END over the already-buffered /// . After Complete(), the input's /// TryAdvanceSegment never blocks on ManualResetEventSlim.Wait() (which would /// throw on WASM) — it returns buffered data /// immediately and signals end-of-stream when exhausted. /// /// /// private static readonly bool IsBrowser = OperatingSystem.IsBrowser(); protected volatile AcBinarySerializerOptions _options; protected readonly BinaryProtocolMode _protocolMode; protected readonly ILogger? _logger; /// /// AsyncSegment per-chunk flush synchronization — see . /// protected readonly FlushPolicy _flushPolicy; /// /// Per-flush wait limit — see . /// Guaranteed positive or by . /// protected readonly TimeSpan _flushTimeout; /// /// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly). /// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server. /// private readonly ConditionalWeakTable _chunkStates; private sealed class AsyncChunkState { public HubMessage PartialMessage = null!; public object?[] Args = null!; public int StreamedArgIndex; public Type StreamedArgType = null!; public AsyncPipeReaderInput Input = null!; public Task? DeserTask; /// /// Per-binder header context — the opaque object returned by for /// the currently chunked message. Persisted across CHUNK_START → CHUNK_DATA × N → CHUNK_END /// boundaries inside the per-binder entry, so derived classes /// can consume it during chunked deserialization without sharing state across connections. /// public object? HeaderContext; /// /// Total bytes of chunk frame data already consumed from the input stream /// (including [201][UINT16] framing headers + data bytes). /// Used to skip already-processed chunks when SignalR re-presents the buffer /// after a false-returning TryParseMessage call. /// public int ChunkFrameBytesConsumed; } /// /// Parameterless constructor — creates the protocol with all-default options /// (, 4 KB buffer, 10 s flush timeout, "acbinary" name). /// Mainly for tests and simple scenarios. For production, pass an explicit /// or configure via DI. /// public AcBinaryHubProtocol() : this(new AcBinaryHubProtocolOptions()) { } /// /// Primary constructor. All configuration flows through . /// Invalid configuration (incl. WebAssembly + AsyncSegment send-path) throws from /// . /// public AcBinaryHubProtocol(AcBinaryHubProtocolOptions options) { if (options is null) throw new ArgumentNullException(nameof(options)); options.Validate(); _options = options.SerializerOptions; _options.BufferWriterChunkSize = options.BufferSize; _protocolMode = options.ProtocolMode; _logger = options.Logger; _flushPolicy = options.FlushPolicy; _flushTimeout = options.FlushTimeout; Name = options.Name; _chunkStates = new ConditionalWeakTable(); _logger?.LogInformation( "AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} flushPolicy={FlushPolicy} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, _flushPolicy, _flushTimeout.TotalMilliseconds, _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); } /// /// Runtime-replaceable serializer options. /// Thread-safe: uses volatile field, callers see the new options on next message. /// public AcBinarySerializerOptions Options { get => _options; set => _options = value; } /// Protocol name sent in SignalR handshake. Set via . Default: "acbinary". public string Name { get; } = "acbinary"; public int Version => 1; public TransferFormat TransferFormat => TransferFormat.Binary; /// /// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask. /// Fast-path: if already completed (no backpressure), returns directly without Task allocation. /// Slow-path: blocks with — throws /// if the flush does not complete within the timeout (protects against slow/stuck/disconnected /// consumers holding the server thread indefinitely). /// /// guarantees _flushTimeout is either /// positive or (which Task.Wait /// natively treats as "wait forever"), so no explicit zero-check is needed here. /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private FlushResult SyncFlush(ValueTask vt) { if (vt.IsCompletedSuccessfully) return vt.Result; var task = vt.AsTask(); return task.Wait(_flushTimeout) ? task.GetAwaiter().GetResult() : throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected."); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsVersionSupported(int version) => version <= Version; #region Extensibility Hooks /// /// Called right after the message type byte (both chunked and non-chunked paths). /// Derived protocols can write extra header fields here (e.g., a type AQN for untyped args). /// /// Default implementation writes nothing — base protocol is fully generic and has no per-message /// extra state. Derived classes must read exactly the same bytes in . /// /// /// Output writer (same one used for the message payload). /// The message being written. /// /// When the chunked path activates, this is the actual argument being streamed (so the derived /// class can use its concrete runtime type). null for non-chunked messages. /// protected virtual void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg) { // Base: no extra header. } /// /// Reads the per-message header written by on the sender side. /// Called right after the message type byte has been consumed. /// /// Returns an opaque context object that is stored in /// for derived classes to consume during the rest of the parse. /// Default implementation returns null. /// /// protected virtual object? ReadHeader(ref SequenceReader r) => null; #endregion #region WriteMessage public ReadOnlyMemory GetMessageBytes(HubMessage message) { // +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory, // which would invalidate the length prefix span obtained before Advance. var writer = new ArrayBufferWriter(_options.BufferWriterChunkSize + LengthPrefixSize); WriteMessage(message, writer); return writer.WrittenMemory; } public void WriteMessage(HubMessage message, IBufferWriter output) { _logger?.LogInformation("Serialize start"); // AsyncSegment: chunked protocol framing for messages with streamable arguments if (_protocolMode == BinaryProtocolMode.AsyncSegment && output is PipeWriter pipeWriter && HasStreamableArgs(message)) { WriteMessageChunked(message, pipeWriter); return; } // Reserve outer length prefix directly on the pipe (before BWO takes over) var lengthSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize); var externalBytes = 0; switch (message) { case InvocationMessage m: WriteInvocation(ref bw, output, m, ref externalBytes); break; case StreamInvocationMessage m: WriteStreamInvocation(ref bw, output, m, ref externalBytes); break; case StreamItemMessage m: WriteStreamItem(ref bw, output, m, ref externalBytes); break; case CompletionMessage m: WriteCompletion(ref bw, output, m, ref externalBytes); break; case CancelInvocationMessage m: WriteCancelInvocation(ref bw, m); break; case PingMessage: bw.WriteByte(MsgPing); break; case CloseMessage m: WriteClose(ref bw, m); break; case AckMessage m: bw.WriteByte(MsgAck); bw.WriteRaw(m.SequenceId); break; case SequenceMessage m: bw.WriteByte(MsgSequence); bw.WriteRaw(m.SequenceId); break; default: throw new HubException($"Unexpected message type: {message.GetType().Name}"); } var totalPayload = bw.Position + externalBytes; bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); _logger?.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload); if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload); } private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgInvocation); WriteHeader(ref bw, m, streamedArg: null); WriteNullableString(ref bw, m.InvocationId); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); } private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamInvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamInvocation); WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); } private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamItemMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamItem); WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); WriteArgument(ref bw, output, m.Item, ref externalBytes); WriteHeaders(ref bw, m.Headers); } private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter output, CompletionMessage m, ref int externalBytes) { bw.WriteByte(MsgCompletion); WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); WriteNullableString(ref bw, m.Error); var hasResult = m.HasResult; bw.WriteByte(hasResult ? (byte)1 : (byte)0); if (hasResult) WriteArgument(ref bw, output, m.Result, ref externalBytes); WriteHeaders(ref bw, m.Headers); } private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m) { bw.WriteByte(MsgCancelInvocation); bw.WriteStringUtf8(m.InvocationId!); WriteHeaders(ref bw, m.Headers); } private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m) { bw.WriteByte(MsgClose); WriteNullableString(ref bw, m.Error); bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0); } #endregion #region Chunked Protocol (AsyncSegment write) /// /// Returns true if the message has arguments that should be streamed via chunked protocol. /// Only non-null, non-byte[] arguments go through the chunked path. /// private static bool HasStreamableArgs(HubMessage message) => message switch { InvocationMessage m => HasNonByteArrayArg(m.Arguments), StreamInvocationMessage m => HasNonByteArrayArg(m.Arguments), StreamItemMessage m => m.Item != null && m.Item is not byte[], CompletionMessage m => m.HasResult && m.Result != null && m.Result is not byte[], _ => false }; private static bool HasNonByteArrayArg(object?[] args) { for (var i = args.Length - 1; i >= 0; i--) { if (args[i] != null && args[i] is not byte[]) return true; } return false; } /// /// Gets the last non-null, non-byte[] argument value and its index for streaming. /// private static (object? value, int index) GetStreamedArg(HubMessage message) => message switch { InvocationMessage m => GetLastNonByteArrayArg(m.Arguments), StreamInvocationMessage m => GetLastNonByteArrayArg(m.Arguments), StreamItemMessage m => (m.Item, 0), CompletionMessage m => (m.Result, 0), _ => (null, -1) }; private static (object? value, int index) GetLastNonByteArrayArg(object?[] args) { for (var i = args.Length - 1; i >= 0; i--) { if (args[i] != null && args[i] is not byte[]) return (args[i], i); } return (null, -1); } /// /// Writes a message using chunked protocol framing for AsyncSegment mode. /// The two phases: /// /// CHUNK_START envelope — standard SignalR framed message /// ([INT32 length][200 marker][header][args except streamedArg as INT32 -1]), /// written here via . /// CHUNK_DATA + CHUNK_END — fully owned by /// (invoked through ): /// emits [201][UINT16 size][data] per chunk + [202] end marker + final /// FlushAsync. This protocol layer no longer writes [201]/[202] /// bytes or calls FlushAsync after the streamed-arg serialize — those are the /// streaming primitive's responsibility (see BINARY_ASYNCPIPE docs). /// /// For streamedArg == null, still /// drives in framed mode — wire is /// [201][UINT16=1][Null][202], deserializing back to null. No special-casing /// needed in this layer. /// private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter) { var (streamedArg, streamedArgIndex) = GetStreamedArg(message); if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}", message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null"); int chunkStartPayload; var dataBytes = 0; // --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) --- { var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize); pipeWriter.Advance(LengthPrefixSize); var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize); var externalBytes = 0; bw.WriteByte(MsgAsyncChunkStart); // Write original message body with INT32 -1 for the streamed arg switch (message) { case InvocationMessage m: bw.WriteByte(MsgInvocation); WriteHeader(ref bw, m, streamedArg); WriteNullableString(ref bw, m.InvocationId); bw.WriteStringUtf8(m.Target); WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); break; case StreamInvocationMessage m: bw.WriteByte(MsgStreamInvocation); WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); bw.WriteStringUtf8(m.Target); WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); break; case StreamItemMessage m: bw.WriteByte(MsgStreamItem); WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); bw.WriteRaw(-1); // streamed arg marker WriteHeaders(ref bw, m.Headers); break; case CompletionMessage m: bw.WriteByte(MsgCompletion); WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); WriteNullableString(ref bw, m.Error); bw.WriteByte(1); // hasResult = true bw.WriteRaw(-1); // streamed arg marker WriteHeaders(ref bw, m.Headers); break; } chunkStartPayload = bw.Position + externalBytes; bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload); _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload); } SyncFlush(pipeWriter.FlushAsync()); // --- CHUNK_DATA + CHUNK_END (fully delegated to AsyncPipeWriterOutput) --- // AsyncPipeWriterOutput in framed mode owns the entire chunked-stream emission: // - [201][UINT16 size][data] per chunk // - [202] CHUNK_END marker // - final FlushAsync // This includes the null streamedArg case (since the AcBinarySerializer null-bypass for // multiMessage=true was removed) — wire is [201][UINT16=1][Null][202], deserialized back to null. // No manual [202] write or extra FlushAsync needed in this layer. // // Fault isolation: if the streamed-arg serialize throws (e.g. a property getter NRE on the // receiver's data class), CHUNK_START has already been sent — the receiver is in chunk-state // waiting for [201]/[202]. Letting the exception propagate would abort the entire SignalR // transport connection in HubConnectionContext.WriteCore (killing all other in-flight // invocations on the same WebSocket). Instead, emit an explicit [203] CHUNK_ABORT marker so // the receiver can fault the pending invocation cleanly while the connection stays alive. // Blast radius = one message. // Edge case (deferred): if AsyncPipeWriterOutput fails after committing a [201][UINT16=N] // header but before writing all N data bytes, the receiver parses [203] as part of that // chunk's data, not as the abort marker. Rare (exceptions typically throw mid-data, not // mid-header), and reaches the protocol-violation path in TryParseChunkData. Robust fix: // AsyncPipeWriterOutput.Abort() padding the in-flight chunk before emitting [203] — see // BINARY_ASYNCPIPE_TODO. try { // Heterogeneous `object?` arg — pass the runtime type explicitly so the wire payload // carries the concrete type's encoding (the generic overload would infer T = object // and emit an object-typed body — the bug behind the 320 SignalR test regressions). var streamedRuntimeType = streamedArg?.GetType() ?? typeof(object); dataBytes = AcBinarySerializer.Serialize(streamedArg, streamedRuntimeType, pipeWriter, _options, _flushPolicy, _flushTimeout); _logger?.LogDebug("WriteMessageChunked CHUNK_DATA + CHUNK_END emitted via AsyncPipeWriterOutput dataBytes={DataBytes}", dataBytes); } catch (Exception serializeEx) { _logger?.LogError(serializeEx, "WriteMessageChunked streamed-arg serialize FAILED — emitting [203] CHUNK_ABORT messageType={MessageType}", message.GetType().Name); if (!TryEmitChunkAbort(pipeWriter)) throw; // pipe dead too — let SignalR abort the connection (baseline behaviour) return; // abort marker on the wire, connection alive, receiver faults the caller } // Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1) // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data. // The +1 at the end is the [202] CHUNK_END marker (now written by AsyncPipeWriterOutput.Flush()). var chunkSize = _options.BufferWriterChunkSize; var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0; var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1; _logger?.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}", dataBytes, chunkCount, totalSentSize); } /// /// Writes arguments for CHUNK_START: all args normally except the streamed one (INT32 -1 marker). /// private void WriteArgumentsChunked(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, int streamedArgIndex, ref int externalBytes) { bw.WriteVarUInt((uint)arguments.Length); for (var i = 0; i < arguments.Length; i++) { if (i == streamedArgIndex) { bw.WriteRaw(-1); // streamed arg placeholder continue; } WriteArgument(ref bw, output, arguments[i], ref externalBytes); } } #endregion #region TryParseMessage public virtual bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) { message = null; // AsyncSegment chunk mode if (_chunkStates.TryGetValue(binder, out var chunkState)) { // Guard against buffer re-presentation: if SignalR re-submitted the same buffer // (because our previous fallthrough returned false without advancing), // the buffer may still contain: // 1. The already-processed CHUNK_START frame // 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously) // Skip both to avoid duplicate writes to state.Input. if (TrySkipRepresentedChunkStart(ref input)) { _logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); // Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START) if (chunkState.ChunkFrameBytesConsumed > 0) { if (input.Length < chunkState.ChunkFrameBytesConsumed) { _logger?.LogWarning("TryParseMessage re-presentation inconsistency: expected >= {Expected} already-consumed bytes but only {Actual} in buffer", chunkState.ChunkFrameBytesConsumed, input.Length); return false; } input = input.Slice(chunkState.ChunkFrameBytesConsumed); _logger?.LogDebug("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}", chunkState.ChunkFrameBytesConsumed, input.Length); } } if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}", binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0); return TryParseChunkData(ref input, chunkState, binder, out message); } // Normal path var reader = new SequenceReader(input); if (!reader.TryReadLittleEndian(out int payloadLength)) return false; if (reader.Remaining < payloadLength) return false; _logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length); _logger?.LogInformation("Deserialize start"); message = ParseMessage(ref reader, payloadLength, binder); if (message != null) { input = input.Slice(LengthPrefixSize + payloadLength); _logger?.LogInformation("Deserialize end"); if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); return true; } // CHUNK_START consumed but no complete HubMessage yet (chunk mode just activated). // Try to process any remaining chunk data already in the buffer. if (_chunkStates.TryGetValue(binder, out chunkState)) { var afterChunkStart = input.Slice(LengthPrefixSize + payloadLength); if (TryParseChunkData(ref afterChunkStart, chunkState, binder, out message)) { // Full chunked message processed in one call input = afterChunkStart; _logger?.LogDebug("TryParseMessage CHUNK_START + chunk data processed in single call"); return true; } // IMPORTANT: do NOT advance input when returning false. // SignalR's contract is "advance only on success". If we advance here, // the buffer state becomes inconsistent on re-submission. // On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode // block above handles that via TrySkipRepresentedChunkStart. _logger?.LogDebug("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)"); return false; } return false; } /// /// Detects if the buffer starts with a re-presented CHUNK_START frame pattern /// ([INT32 length][CHUNK_START marker]). If so, advances /// past the entire frame and returns true. /// /// This guards against the case where SignalR's buffer management re-presents /// bytes we logically consumed during a previous false-returning TryParseMessage call. /// private static bool TrySkipRepresentedChunkStart(ref ReadOnlySequence input) { if (input.Length < LengthPrefixSize + 1) return false; Span header = stackalloc byte[LengthPrefixSize + 1]; input.Slice(0, LengthPrefixSize + 1).CopyTo(header); int maybeLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32LittleEndian(header.Slice(0, LengthPrefixSize)); byte maybeMarker = header[LengthPrefixSize]; if (maybeMarker != MsgAsyncChunkStart) return false; if (maybeLen <= 0 || input.Length < LengthPrefixSize + maybeLen) return false; input = input.Slice(LengthPrefixSize + maybeLen); return true; } private HubMessage? ParseMessage(ref SequenceReader r, int payloadLength, IInvocationBinder binder) { if (payloadLength == 0) return null; // Mark end position so Parse* methods can check Remaining relative to payload var payloadEnd = r.Consumed + payloadLength; r.TryRead(out var msgType); // The header context (out _) is intentionally discarded on the non-chunked path — // it lives only on the stack frame of the Parse* call and is consumed inline by the // ReadArguments / ReadSingleArgument calls inside that frame. No instance state means // no race even when this protocol instance is shared across threads (NuGet contract). return msgType switch { MsgInvocation => ParseInvocation(ref r, binder, out _), MsgStreamInvocation => ParseStreamInvocation(ref r, binder, out _), MsgStreamItem => ParseStreamItem(ref r, binder, out _), MsgCompletion => ParseCompletion(ref r, binder, out _), MsgCancelInvocation => ParseCancelInvocation(ref r), MsgPing => PingMessage.Instance, MsgClose => ParseClose(ref r), MsgAck => new AckMessage(ReadInt64(ref r)), MsgSequence => new SequenceMessage(ReadInt64(ref r)), MsgAsyncChunkStart => ParseAsyncChunkStart(ref r, binder), _ => null }; } /// /// Legacy diagnostic logger. Use ILogger via constructor instead. /// [Obsolete("Use ILogger via constructor parameter instead. This property will be removed in a future version.")] public static Action? DiagnosticLogger { get; set; } [Conditional("DEBUG")] private void LogDiagnostic(string message) => _logger?.LogDebug(message); [Conditional("DEBUG")] private void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType) { if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; var segmentCount = 0; foreach (var _ in argSlice) segmentCount++; _logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}", argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name); } [Conditional("DEBUG")] private void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) { if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; var typeNames = new string[paramTypes.Count]; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; _logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}", target, paramTypes.Count, string.Join(", ", typeNames), remaining); } private HubMessage ParseInvocation(ref SequenceReader r, IInvocationBinder binder, out object? headerContext) { headerContext = ReadHeader(ref r); var invocationId = ReadNullableString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); LogParseInvocation(target, paramTypes, r.Remaining); var args = ReadArguments(ref r, paramTypes, headerContext); var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseStreamInvocation(ref SequenceReader r, IInvocationBinder binder, out object? headerContext) { headerContext = ReadHeader(ref r); var invocationId = ReadString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); var args = ReadArguments(ref r, paramTypes, headerContext); var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = new StreamInvocationMessage(invocationId, target, args, streamIds); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseStreamItem(ref SequenceReader r, IInvocationBinder binder, out object? headerContext) { headerContext = ReadHeader(ref r); var invocationId = ReadString(ref r); var itemType = binder.GetStreamItemType(invocationId); var item = ReadSingleArgument(ref r, itemType, headerContext); var headers = ReadHeaders(ref r); var msg = new StreamItemMessage(invocationId, item); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseCompletion(ref SequenceReader r, IInvocationBinder binder, out object? headerContext) { headerContext = ReadHeader(ref r); var invocationId = ReadString(ref r); var error = ReadNullableString(ref r); r.TryRead(out var hasResultByte); var hasResult = hasResultByte == 1; object? result = null; if (hasResult) { var resultType = binder.GetReturnType(invocationId); result = ReadSingleArgument(ref r, resultType, headerContext); } var headers = ReadHeaders(ref r); CompletionMessage msg; if (error != null) msg = CompletionMessage.WithError(invocationId, error); else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result); else msg = CompletionMessage.Empty(invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private static HubMessage ParseCancelInvocation(ref SequenceReader r) { var invocationId = ReadString(ref r); var headers = ReadHeaders(ref r); var msg = new CancelInvocationMessage(invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private static HubMessage ParseClose(ref SequenceReader r) { var error = ReadNullableString(ref r); r.TryRead(out var reconnectByte); var allowReconnect = reconnectByte == 1; return new CloseMessage(error, allowReconnect); } #endregion #region Chunked Protocol (AsyncSegment read) /// /// Processes CHUNK_DATA and CHUNK_END in chunk accumulation mode. /// Called from TryParseMessage when an active AsyncChunkState exists for this connection. /// Loops over all available chunks — critical because SignalR's while loop exits when /// TryParseMessage returns false, and won't re-enter until new data arrives on the pipe. /// private bool TryParseChunkData(ref ReadOnlySequence input, AsyncChunkState state, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) { message = null; while (input.Length >= 1) { var firstByte = input.FirstSpan[0]; if (firstByte == MsgAsyncChunkData) // 201 — self-describing data chunk [201][UINT16 size][data] { // Need at least [201][UINT16] if (input.Length < 3) return false; // Read UINT16 chunk data size var headerSlice = input.Slice(1, 2); Span sizeBytes = stackalloc byte[2]; headerSlice.CopyTo(sizeBytes); var chunkDataSize = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(sizeBytes); var totalNeeded = 3 + chunkDataSize; // header (3) + data if (input.Length < totalNeeded) return false; _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); // Feed chunk data into AsyncPipeReaderInput for background deserialization. // Note: the input is multiMessage:false — we strip framing here and pass raw data. if (chunkDataSize > 0) { var dataSlice = input.Slice(3, chunkDataSize); foreach (var segment in dataSlice) state.Input.Feed(segment.Span); } // Lazy start: begin background deserialization after first chunk is written. // The deser task reads via AsyncPipeReaderInputAdapter (struct over class) which // calls TryAdvanceSegment on the input — blocks on ManualResetEventSlim.Wait when // out of data. Browser fallback: skip Task.Run — the MRES.Wait throws // PlatformNotSupportedException on WASM. Instead, buffer all chunks and run the // deserializer synchronously on CHUNK_END, where state.Input.Complete() has // already been called → TryAdvanceSegment never enters the Wait path. if (state.DeserTask == null && !IsBrowser) { _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); var input2 = state.Input; var type = state.StreamedArgType; var opts = _options; state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input2, type, opts)); } input = input.Slice(totalNeeded); state.ChunkFrameBytesConsumed += totalNeeded; continue; // try next chunk immediately } if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) { _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion"); // Signal end of data → background deser task completes state.Input.Complete(); object? deserializedArg = null; try { if (state.DeserTask != null) { // Desktop / server: background task has been deserializing concurrently // with chunk arrival (pipeline parallelism). Wait for its result here. deserializedArg = state.DeserTask.GetAwaiter().GetResult(); } else { // Browser (WASM) fallback: run the deserializer synchronously on the // already-buffered input. After Complete() the input's TryAdvanceSegment // returns buffered data immediately and never blocks on // ManualResetEventSlim.Wait (which would throw PlatformNotSupportedException // on WASM). Same struct-adapter path the background task uses; small JIT- // inlined indirection vs. the previous direct byte[] overload — negligible // per-message, and removes the WASM-specific buffer-mutation access. deserializedArg = AcBinaryDeserializer.Deserialize( state.Input, state.StreamedArgType, _options); } _logger?.LogInformation("Deserialize end (chunked)"); if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); } catch (Exception ex) { _logger?.LogError(ex, "TryParseChunkData deserialization FAILED targetType={TargetType}", state.StreamedArgType.Name); throw; } finally { _logger?.LogDebug("TryParseChunkData [202] cleanup: Input.Dispose + _chunkStates.Remove"); state.Input.Dispose(); _chunkStates.Remove(binder); } // Fill the placeholder in the stored message's args FillStreamedArg(state, deserializedArg); input = input.Slice(1); // consume the single [202] byte message = state.PartialMessage; return true; } if (firstByte == MsgAsyncChunkAbort) // 203 — server abandoned the chunked message mid-stream { _logger?.LogWarning("TryParseChunkData [203] CHUNK_ABORT targetType={TargetType} chunkFrameBytesConsumed={ChunkFrameBytesConsumed}", state.StreamedArgType.Name, state.ChunkFrameBytesConsumed); AbandonChunkState(state, binder, reason: "[203] CHUNK_ABORT"); input = input.Slice(1); // consume the [203] byte // Surface the abort to the caller via OnChunkAbort. Default base routing uses // SignalR's InvocationId (CompletionMessage.WithError); derived classes can // override for application-level correlation (e.g. SignalParams.requestId in args). var invocationId = GetInvocationId(state.PartialMessage); message = OnChunkAbort(state.PartialMessage, state.HeaderContext, invocationId); if (message == null) { // No routing target — return PingMessage so the SignalR loop's "consumed input // ↔ produced message" contract holds. True fire-and-forget, or override handed // off routing out-of-band. _logger?.LogWarning("TryParseChunkData [203] OnChunkAbort returned null — returning Ping (no caller to fault)"); message = PingMessage.Instance; } return true; } // Protocol-invariant violation — not [201]/[202]/[203]. Legitimate sender-abort is // handled by the [203] branch above; anything reaching here is genuine framing // corruption (sender bug, version mismatch, or chunk header misread drifting into // data). The old "fallback to normal parse" was guess-and-hope: it can't tell mid-data // garbage from a real next message. Surface as InvalidDataException — SignalR's outer // handler treats it as a transport fault rather than masking it. _logger?.LogError("TryParseChunkData PROTOCOL VIOLATION unknown byte {FirstByte} in chunk mode (expected [201]/[202]/[203]). " + "binderHash={BinderHash} inputLength={InputLength} targetType={TargetType} deserTaskStatus={TaskStatus} chunkFrameBytesConsumed={ChunkFrameBytesConsumed}", firstByte, binder.GetHashCode(), input.Length, state.StreamedArgType.Name, state.DeserTask?.Status.ToString() ?? "null", state.ChunkFrameBytesConsumed); AbandonChunkState(state, binder, reason: $"protocol violation (byte 0x{firstByte:X2})"); throw new System.IO.InvalidDataException( $"AcBinary chunked protocol violation: unexpected byte 0x{firstByte:X2} ({firstByte}) in chunk mode " + $"for targetType={state.StreamedArgType.Name}; expected [201] CHUNK_DATA, [202] CHUNK_END, or [203] CHUNK_ABORT."); } return false; } /// /// Teardown for non-success chunk-state termination (CHUNK_ABORT or protocol violation). /// Mirrors the CHUNK_END path's Complete → await-deser → Dispose → Remove ordering, but /// observes the background deser task's failure (likely on partial input) without rethrowing — /// the abort/violation is the authoritative outcome, the deser exception is a derived effect. /// private void AbandonChunkState(AsyncChunkState state, IInvocationBinder binder, string reason) { state.Input.Complete(); if (state.DeserTask != null) { try { state.DeserTask.GetAwaiter().GetResult(); } catch (Exception deserEx) { _logger?.LogDebug(deserEx, "AbandonChunkState ({Reason}): background deser task faulted on partial input (expected)", reason); } } state.Input.Dispose(); _chunkStates.Remove(binder); } /// /// Extracts the InvocationId from a chunked-mode , /// covering all message types that flow through . Returns /// null for unexpected types (defensive — shouldn't occur in normal use). /// private static string? GetInvocationId(HubMessage message) => message switch { InvocationMessage im => im.InvocationId, StreamInvocationMessage sim => sim.InvocationId, StreamItemMessage stim => stim.InvocationId, CompletionMessage cm => cm.InvocationId, _ => null }; /// /// Best-effort emit of the [203] CHUNK_ABORT marker after a serialize failure in /// . Returns true if the byte was successfully written /// and flushed; false if even the abort emit failed (transport-level fault) — the /// caller should then rethrow to fall back to connection-level abort behaviour. /// private bool TryEmitChunkAbort(PipeWriter pipeWriter) { try { var abortSpan = pipeWriter.GetSpan(1); abortSpan[0] = MsgAsyncChunkAbort; pipeWriter.Advance(1); SyncFlush(pipeWriter.FlushAsync()); _logger?.LogDebug("WriteMessageChunked [203] CHUNK_ABORT emitted (graceful fault isolation)"); return true; } catch (Exception abortEx) { _logger?.LogError(abortEx, "WriteMessageChunked failed to emit [203] CHUNK_ABORT — falling back to connection abort"); return false; } } /// /// Called by the CHUNK_ABORT [203] receive branch in to /// produce a HubMessage that surfaces the abort to the awaiting caller. /// Default base implementation: returns if /// is non-null (SignalR-level routing — the awaiting task is /// faulted with the embedded error). Returns null for fire-and-forget invocations (no /// SignalR InvocationId), in which case the caller falls back to /// and the abort is not propagated to any specific waiter. /// Derived classes can override to synthesise an application-level error response — e.g. /// when the protocol uses a custom request/response correlation in the message arguments /// (rather than SignalR's InvocationId), the override can build an /// that routes to the caller's application-level error path. Returning a non-null result short-circuits /// the base SignalR-level routing; returning null explicitly hands off to the Ping fallback /// (signalling "the abort was handled out-of-band or has no addressable caller"). /// /// The original message that activated chunk mode (with the streamed-arg placeholder still in place). /// The opaque header context produced by for this message. /// The SignalR-level InvocationId, or null for fire-and-forget messages. /// A HubMessage to surface to the SignalR loop, or null to fall back to . protected virtual HubMessage? OnChunkAbort(HubMessage partialMessage, object? headerContext, string? invocationId) { if (string.IsNullOrEmpty(invocationId)) return null; return CompletionMessage.WithError(invocationId, "Server abandoned the chunked response (remote serialize failure — see server logs)."); } /// /// Parses CHUNK_START: reads original message (with -1 marker for streamed arg), /// creates , stores state. Background deser task starts lazily on first chunk. /// Returns null to signal "consumed bytes, no complete message yet". /// private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) { r.TryRead(out var originalMsgType); _logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType); // Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments. // The header context returned by Parse* is captured locally and persisted on the per-binder // AsyncChunkState below, so it survives the CHUNK_START → CHUNK_DATA × N → CHUNK_END boundary // without any shared instance state (race-mentes on a shared protocol instance). HubMessage? partialMessage; object? headerContext; switch (originalMsgType) { case MsgInvocation: partialMessage = ParseInvocation(ref r, binder, out headerContext); break; case MsgStreamInvocation: partialMessage = ParseStreamInvocation(ref r, binder, out headerContext); break; case MsgStreamItem: partialMessage = ParseStreamItem(ref r, binder, out headerContext); break; case MsgCompletion: partialMessage = ParseCompletion(ref r, binder, out headerContext); break; default: return null; } if (partialMessage == null) return null; // Find the placeholder arg and its target type var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder); // Derived classes can override ResolveStreamedArgType to consult the header context // (returned by ReadHeader) for per-message type resolution. streamedType = ResolveStreamedArgType(streamedType, headerContext); _logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}", streamedIndex, streamedType.Name); var state = new AsyncChunkState { PartialMessage = partialMessage, Args = args, StreamedArgIndex = streamedIndex, StreamedArgType = streamedType, HeaderContext = headerContext, // multiMessage: false — SignalR's TryParseChunkData parses [201]/[202] framing externally // and feeds raw data bytes into the input. The framing-state-machine inside // AsyncPipeReaderInput is not used on this code path. Input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2, multiMessage: false) // DeserTask started lazily in TryParseChunkData after first chunk is written }; _chunkStates.AddOrUpdate(binder, state); _logger?.LogDebug("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}", binder.GetHashCode(), streamedType.Name); return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData } /// /// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array, /// placeholder index, and the target deserialization type. /// private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder) { switch (message) { case InvocationMessage inv: { var paramTypes = binder.GetParameterTypes(inv.Target); for (var i = 0; i < inv.Arguments.Length; i++) { if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue; var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); return (inv.Arguments, i, type); } break; } case StreamInvocationMessage sinv: { var paramTypes = binder.GetParameterTypes(sinv.Target); for (var i = 0; i < sinv.Arguments.Length; i++) { if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue; var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); return (sinv.Arguments, i, type); } break; } case StreamItemMessage si: { if (ReferenceEquals(si.Item, StreamedArgPlaceholder)) { // StreamItemMessage.Item is read-only, use a wrapper array var args = new object?[] { si.Item }; var type = binder.GetStreamItemType(si.InvocationId!); return (args, 0, type); } break; } case CompletionMessage comp: { if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder)) { var args = new object?[] { comp.Result }; var type = binder.GetReturnType(comp.InvocationId!); return (args, 0, type); } break; } } return ([], -1, typeof(object)); } /// /// Replaces the StreamedArgPlaceholder with the deserialized value in the stored message. /// private static void FillStreamedArg(AsyncChunkState state, object? deserializedValue) { if (state.StreamedArgIndex < 0) return; switch (state.PartialMessage) { case InvocationMessage inv: inv.Arguments[state.StreamedArgIndex] = deserializedValue; break; case StreamInvocationMessage sinv: sinv.Arguments[state.StreamedArgIndex] = deserializedValue; break; case StreamItemMessage: // StreamItemMessage.Item has no public setter — need to create a new message if (state.PartialMessage is StreamItemMessage si) state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue); break; case CompletionMessage: // CompletionMessage.Result has no public setter — need to create a new message if (state.PartialMessage is CompletionMessage comp) state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue); break; } } #endregion #region Argument Serialization private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, ref int externalBytes) { bw.WriteVarUInt((uint)arguments.Length); for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes); } private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter output, object? value, ref int externalBytes) { // byte[] fast-path: size known upfront, write entirely through BWO if (value is byte[] byteArray) { var isAcBinary = byteArray.Length >= 2 && byteArray[0] == AcBinarySerializerOptions.FormatVersion && (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase; if (isAcBinary) { // Already AcBinary-serialized: write raw length + bytes, no tag wrapper bw.WriteRaw(byteArray.Length); } else { // Raw byte[] (image, file, etc.): tag + raw bytes, no VarUInt (argLength implies size) bw.WriteRaw(1 + byteArray.Length); bw.WriteByte(BinaryTypeCode.ByteArray); } bw.WriteBytes(byteArray); return; } // Runtime type for the heterogeneous `object?` arg — preserves polymorphism on the wire // (the generic ToBinary() overload would infer T = object, losing the concrete type). // Null-safe fallback to typeof(object); the underlying Serialize early-returns the Null // marker for null values. var runtimeType = value?.GetType() ?? typeof(object); // Bytes mode: serialize to byte[], write through BWO (no FlushAndReset needed) if (_protocolMode == BinaryProtocolMode.Bytes) { var serialized = value.ToBinary(runtimeType, _options); bw.WriteRaw(serialized.Length); bw.WriteBytes(serialized); DebugLogArgument(runtimeType, serialized.Length, value); return; } // Segment mode: serialize directly to the pipe via BufferWriterBinaryOutput // (AsyncSegment goes through WriteMessageChunked, never reaches here) bw.FlushAndReset(); // Reserve arg length prefix directly on the pipe var argLenSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); // ToBinary(Type, IBufferWriter, options) doesn't return the byte count — call the // type-explicit serializer overload directly to capture argBytes for the length prefix. var argBytes = AcBinarySerializer.Serialize(value, runtimeType, output, _options); Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); externalBytes += LengthPrefixSize + argBytes; DebugLogArgument(runtimeType, argBytes, value); } [Conditional("DEBUG")] protected void DebugLogArgument(Type runtimeType, int argBytes, object? value) { var kind = value switch { null => "null", System.Collections.IDictionary => "dictionary", System.Collections.IEnumerable when value is not string => "collection", _ => "scalar" }; _logger?.LogDebug("WriteArgument runtimeType={RuntimeType} argBytes={ArgBytes} valueIsNull={ValueIsNull} valueTypeKind={Kind}", runtimeType.FullName, argBytes, value == null, kind); Console.WriteLine($"[DEBUG] WriteArgument runtimeType={runtimeType.FullName} argBytes={argBytes} valueIsNull={value == null} kind={kind}"); } private object?[] ReadArguments(ref SequenceReader r, IReadOnlyList paramTypes, object? headerContext) { var count = (int)ReadVarUInt(ref r); LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}"); var args = new object?[count]; for (var i = 0; i < count; i++) { var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object); LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}"); args[i] = ReadSingleArgument(ref r, targetType, headerContext); OnArgumentRead(args[i], i); } return args; } protected virtual void OnArgumentRead(object? value, int index) { } /// /// Override to resolve typeof(object) to a concrete type. Called after FindStreamedArgSlot in /// chunked deserialization with the header context returned by for /// the same message — derived classes can use it for per-message type resolution without /// touching shared instance state. /// protected virtual Type ResolveStreamedArgType(Type binderType, object? headerContext) => binderType; /// /// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer. /// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[]. /// The is the opaque object returned by /// for the same message — derived classes can use it to drive per-message decoding decisions /// (e.g. raw-bytes vs typed deserialization, target-type override) without touching shared /// instance state. /// protected virtual object? ReadSingleArgument(ref SequenceReader r, Type targetType, object? headerContext) { r.TryReadLittleEndian(out int argLength); if (argLength == 0) return null; // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization if (argLength == -1) { _logger?.LogTrace("ReadSingleArgument streamed arg marker (-1) → placeholder"); return StreamedArgPlaceholder; } // Null marker check if (argLength == 1) { r.TryPeek(out var marker); if (marker == 0) { r.Advance(1); return null; } } // Slice argument from pipe sequence — zero-copy reference var argSlice = r.UnreadSequence.Slice(0, argLength); r.Advance(argLength); LogReadSingleArgument(argSlice, argLength, targetType); // byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag → // strip tag, rest is raw payload. No VarUInt length (argLength implies size). var argReader = new SequenceReader(argSlice); if (argReader.TryPeek(out var tag) && tag == BinaryTypeCode.ByteArray) { return SequenceToByteArray(argSlice.Slice(1)); } // Unified non-chunked receive path: always ArrayBinaryInput via offset-aware overload. // Single-segment: zero-copy on the pipe's slab. Multi-segment: pool-rented copy. // _protocolMode no longer affects the receive side — it is only a send-side strategy. var (arr, offset, length, rented) = GetArgBytes(argSlice); try { return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, _options); } finally { if (rented) ArrayPool.Shared.Return(arr); } } /// /// Returns raw byte[] from the pipe sequence without any deserialization. /// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static byte[] SequenceToByteArray(ReadOnlySequence data) { if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length) return seg.Array; return data.ToArray(); } /// /// Exposes argSlice bytes as (array, offset, length) for offset-aware /// . /// /// Single-segment: zero-copy via — no allocation, no copy. /// Multi-segment: -rented contiguous copy; caller MUST return /// the array via when rented is true. /// /// Enables ArrayBinaryInput (fastest — JIT-eliminates the TryAdvanceSegment branch) regardless /// of whether the pipe delivered the payload as a single slab or multiple. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static (byte[] array, int offset, int length, bool rented) GetArgBytes(ReadOnlySequence argSlice) { if (argSlice.IsSingleSegment && MemoryMarshal.TryGetArray(argSlice.First, out var seg)) return (seg.Array!, seg.Offset, seg.Count, rented: false); var length = (int)argSlice.Length; var rentedBuf = ArrayPool.Shared.Rent(length); argSlice.CopyTo(rentedBuf); return (rentedBuf, 0, length, rented: true); } #endregion #region Write Framing Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) { if (value == null) { bw.WriteByte(0); return; } bw.WriteByte(1); bw.WriteStringUtf8(value); } private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array) { if (array == null || array.Length == 0) { bw.WriteVarUInt(0); return; } bw.WriteVarUInt((uint)array.Length); for (var i = 0; i < array.Length; i++) bw.WriteStringUtf8(array[i]); } private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary? headers) { if (headers == null || headers.Count == 0) { bw.WriteVarUInt(0); return; } bw.WriteVarUInt((uint)headers.Count); foreach (var kv in headers) { bw.WriteStringUtf8(kv.Key); bw.WriteStringUtf8(kv.Value); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int VarUIntSize(uint value) { if (value < 0x80) return 1; if (value < 0x4000) return 2; if (value < 0x200000) return 3; if (value < 0x10000000) return 4; return 5; } #endregion #region Sequence Read Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] private static long ReadInt64(ref SequenceReader r) { r.TryReadLittleEndian(out long v); return v; } /// /// Prefix-tier VarUInt decode (UTF-8-style). MUST stay symmetric with the write-side /// and /// . The previous LEB128 implementation /// became wire-format-mismatched after the V3P9 prefix-tier VarUInt rewrite — root cause /// of the SignalR test regressions. /// First-byte prefix → total size: 0xxxxxxx (1B) | 10xxxxxx (2B) | 110xxxxx (3B) | 1110xxxx (4B) | 1111xxxx (5B). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static uint ReadVarUInt(ref SequenceReader r) { if (!r.TryRead(out var b0)) return 0; if (b0 < 0x80) return b0; // 2-byte tier if (!r.TryRead(out var b1)) return 0; if (b0 < 0xC0) return ((uint)(b0 & 0x3F) << 8) | b1; // 3-byte tier if (!r.TryRead(out var b2)) return 0; if (b0 < 0xE0) return ((uint)(b0 & 0x1F) << 16) | ((uint)b2 << 8) | b1; // 4-byte tier if (!r.TryRead(out var b3)) return 0; if (b0 < 0xF0) return ((uint)(b0 & 0x0F) << 24) | ((uint)b3 << 16) | ((uint)b2 << 8) | b1; // 5-byte tier (prefix nibble unused) if (!r.TryRead(out var b4)) return 0; return ((uint)b4 << 24) | ((uint)b3 << 16) | ((uint)b2 << 8) | b1; } protected static string ReadString(ref SequenceReader r) { var byteCount = (int)ReadVarUInt(ref r); if (byteCount == 0) return string.Empty; r.TryReadExact(byteCount, out var bytes); return bytes.IsSingleSegment ? Encoding.UTF8.GetString(bytes.FirstSpan) : Encoding.UTF8.GetString(bytes.ToArray()); } protected static string? ReadNullableString(ref SequenceReader r) { r.TryRead(out var marker); return marker == 0 ? null : ReadString(ref r); } private static string[]? ReadStringArray(ref SequenceReader r) { var count = (int)ReadVarUInt(ref r); if (count == 0) return null; var array = new string[count]; for (var i = 0; i < count; i++) array[i] = ReadString(ref r); return array; } private static Dictionary? ReadHeaders(ref SequenceReader r) { if (r.Remaining == 0) return null; var count = (int)ReadVarUInt(ref r); if (count == 0) return null; var headers = new Dictionary(count, StringComparer.Ordinal); for (var i = 0; i < count; i++) { var key = ReadString(ref r); var value = ReadString(ref r); headers[key] = value; } return headers; } #endregion #region Helpers private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId) { if (invocationId != null) return new InvocationMessage(invocationId, msg.Target, msg.Arguments); return msg; } private static void SetHeaders(HubMessage msg, Dictionary headers) { if (msg is HubInvocationMessage invMsg) invMsg.Headers = headers; } #endregion }