AyCode.Core/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs

1380 lines
55 KiB
C#

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs;
/// <summary>
/// Custom SignalR hub protocol using AcBinarySerializer for wire format.
/// Eliminates JSON+Base64 overhead by serializing all HubMessages directly to binary.
///
/// Wire format per message:
/// [4 bytes: payload length (little-endian)] [payload bytes]
///
/// Payload structure:
/// [1 byte: message type] [message-specific fields serialized via AcBinary]
///
/// Message types map 1:1 to SignalR HubMessageType values.
/// Arguments are serialized individually with an INT32 length prefix each,
/// enabling deferred deserialization via IHubProtocol's binder pattern.
///
/// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path.
/// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write).
///
/// Read path: SequenceReader&lt;byte&gt; reads directly from the pipe's ReadOnlySequence.
/// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read).
/// </summary>
public class AcBinaryHubProtocol : IHubProtocol
{
private const int LengthPrefixSize = 4;
// Message type markers (matching HubMessageType enum values)
private const byte MsgInvocation = 1;
private const byte MsgStreamItem = 2;
private const byte MsgCompletion = 3;
private const byte MsgStreamInvocation = 4;
private const byte MsgCancelInvocation = 5;
private const byte MsgPing = 6;
private const byte MsgClose = 7;
private const byte MsgAck = 8;
private const byte MsgSequence = 9;
// Chunked protocol framing for AsyncSegment mode
private const byte MsgAsyncChunkStart = 200;
private const byte MsgAsyncChunkData = 201;
private const byte MsgAsyncChunkEnd = 202;
/// <summary>Sentinel object placed in the args array for the streamed argument (replaced after chunk deserialization).</summary>
protected static readonly object StreamedArgPlaceholder = new();
/// <summary>
/// True when running on a browser (WebAssembly) runtime. Cached at type-load because
/// the value is invariant per process and the check is used on hot paths.
/// <para>
/// Browser implications:
/// <list type="bullet">
/// <item>Send path: <c>AsyncSegment</c> is unsupported (sync-over-async flush blocks the single UI thread).</item>
/// <item>Receive path: when chunked wire arrives, background <c>Task.Run</c> is skipped;
/// the deserializer runs synchronously on <c>CHUNK_END</c> over the already-buffered data
/// (<see cref="SegmentBufferReader"/>'s <c>ManualResetEventSlim.Wait()</c> would throw
/// <see cref="PlatformNotSupportedException"/>).</item>
/// </list>
/// </para>
/// </summary>
private static readonly bool IsBrowser = OperatingSystem.IsBrowser();
protected volatile AcBinarySerializerOptions _options;
protected readonly BinaryProtocolMode _protocolMode;
protected readonly ILogger? _logger;
/// <summary>
/// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
/// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server.
/// </summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState> _chunkStates;
/// <summary>
/// Opaque context produced by <see cref="ReadHeader"/> for the currently-parsed message.
/// Set by parse methods (ParseInvocation, ParseStreamInvocation, ParseStreamItem, ParseCompletion)
/// right after reading the per-message header. Derived protocols can read this to customize
/// argument deserialization (e.g., type resolution when <c>targetType == typeof(object)</c>).
/// </summary>
protected object? _currentHeaderContext;
private sealed class AsyncChunkState
{
public HubMessage PartialMessage = null!;
public object?[] Args = null!;
public int StreamedArgIndex;
public Type StreamedArgType = null!;
public SegmentBufferReader Buffer = null!;
public Task<object?>? DeserTask;
/// <summary>
/// Total bytes of chunk frame data already consumed from the input stream
/// (including [201][UINT16] framing headers + data bytes).
/// Used to skip already-processed chunks when SignalR re-presents the buffer
/// after a false-returning TryParseMessage call.
/// </summary>
public int ChunkFrameBytesConsumed;
}
public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null)
{
// Send-side guard: AsyncSegment uses AsyncPipeWriterOutput whose sync-over-async flush
// would block the browser's single UI thread. The receive side converts chunked wire
// to a synchronous deserialize on WASM automatically (see TryParseChunkData).
if (IsBrowser && protocolMode == BinaryProtocolMode.AsyncSegment)
throw new PlatformNotSupportedException(
"BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " +
"Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead.");
_options = options;
_options.BufferWriterChunkSize = 4096;
_protocolMode = protocolMode;
_logger = logger;
_chunkStates = new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>();
if (_logger != null)
{
_logger.LogInformation(
"AcBinaryHubProtocol initialized mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}",
_protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity,
_options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression);
}
}
/// <summary>
/// Runtime-replaceable serializer options.
/// Thread-safe: uses volatile field, callers see the new options on next message.
/// </summary>
public AcBinarySerializerOptions Options
{
get => _options;
set => _options = value;
}
public string Name => "acbinary";
public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary;
/// <summary>
/// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask.
/// Fast-path: if already completed (no backpressure), returns directly without Task allocation.
/// Slow-path: converts to Task for proper blocking when pipe backpressure is active.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static FlushResult SyncFlush(ValueTask<FlushResult> vt)
=> vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
#region Extensibility Hooks
/// <summary>
/// Called right after the message type byte (both chunked and non-chunked paths).
/// Derived protocols can write extra header fields here (e.g., a type AQN for untyped args).
/// <para>
/// Default implementation writes nothing — base protocol is fully generic and has no per-message
/// extra state. Derived classes <b>must</b> read exactly the same bytes in <see cref="ReadHeader"/>.
/// </para>
/// </summary>
/// <param name="bw">Output writer (same one used for the message payload).</param>
/// <param name="message">The message being written.</param>
/// <param name="streamedArg">
/// When the chunked path activates, this is the actual argument being streamed (so the derived
/// class can use its concrete runtime type). <c>null</c> for non-chunked messages.
/// </param>
protected virtual void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
// Base: no extra header.
}
/// <summary>
/// Reads the per-message header written by <see cref="WriteHeader"/> on the sender side.
/// Called right after the message type byte has been consumed.
/// <para>
/// Returns an opaque context object that is stored in <see cref="_currentHeaderContext"/>
/// for derived classes to consume during the rest of the parse.
/// Default implementation returns <c>null</c>.
/// </para>
/// </summary>
protected virtual object? ReadHeader(ref SequenceReader<byte> r) => null;
#endregion
#region WriteMessage
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
{
// +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory,
// which would invalidate the length prefix span obtained before Advance.
var writer = new ArrayBufferWriter<byte>(_options.BufferWriterChunkSize + LengthPrefixSize);
WriteMessage(message, writer);
return writer.WrittenMemory;
}
public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{
if (_logger != null)
{
_logger.LogInformation("Serialize start");
}
// AsyncSegment: chunked protocol framing for messages with streamable arguments
if (_protocolMode == BinaryProtocolMode.AsyncSegment
&& output is PipeWriter pipeWriter
&& HasStreamableArgs(message))
{
WriteMessageChunked(message, pipeWriter);
return;
}
// Reserve outer length prefix directly on the pipe (before BWO takes over)
var lengthSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
var externalBytes = 0;
switch (message)
{
case InvocationMessage m:
WriteInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamInvocationMessage m:
WriteStreamInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamItemMessage m:
WriteStreamItem(ref bw, output, m, ref externalBytes);
break;
case CompletionMessage m:
WriteCompletion(ref bw, output, m, ref externalBytes);
break;
case CancelInvocationMessage m:
WriteCancelInvocation(ref bw, m);
break;
case PingMessage:
bw.WriteByte(MsgPing);
break;
case CloseMessage m:
WriteClose(ref bw, m);
break;
case AckMessage m:
bw.WriteByte(MsgAck);
bw.WriteRaw(m.SequenceId);
break;
case SequenceMessage m:
bw.WriteByte(MsgSequence);
bw.WriteRaw(m.SequenceId);
break;
default:
throw new HubException($"Unexpected message type: {message.GetType().Name}");
}
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
if (_logger != null)
{
_logger.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload);
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload);
}
}
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg: null);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamInvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamItemMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteArgument(ref bw, output, m.Item, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, CompletionMessage m, ref int externalBytes)
{
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
var hasResult = m.HasResult;
bw.WriteByte(hasResult ? (byte)1 : (byte)0);
if (hasResult)
WriteArgument(ref bw, output, m.Result, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m)
{
bw.WriteByte(MsgCancelInvocation);
bw.WriteStringUtf8(m.InvocationId!);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m)
{
bw.WriteByte(MsgClose);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0);
}
#endregion
#region Chunked Protocol (AsyncSegment write)
/// <summary>
/// Returns true if the message has arguments that should be streamed via chunked protocol.
/// Only non-null, non-byte[] arguments go through the chunked path.
/// </summary>
private static bool HasStreamableArgs(HubMessage message) => message switch
{
InvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamItemMessage m => m.Item != null && m.Item is not byte[],
CompletionMessage m => m.HasResult && m.Result != null && m.Result is not byte[],
_ => false
};
private static bool HasNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return true;
}
return false;
}
/// <summary>
/// Gets the last non-null, non-byte[] argument value and its index for streaming.
/// </summary>
private static (object? value, int index) GetStreamedArg(HubMessage message) => message switch
{
InvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamItemMessage m => (m.Item, 0),
CompletionMessage m => (m.Result, 0),
_ => (null, -1)
};
private static (object? value, int index) GetLastNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return (args[i], i);
}
return (null, -1);
}
/// <summary>
/// Writes a message using chunked protocol framing for AsyncSegment mode.
/// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg.
/// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy).
/// CHUNK_END: [202] (1 byte, no data — all data already committed by output).
/// </summary>
private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter)
{
var (streamedArg, streamedArgIndex) = GetStreamedArg(message);
if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}",
message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null");
int chunkStartPayload;
var dataBytes = 0;
// --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) ---
{
var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize);
pipeWriter.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize);
var externalBytes = 0;
bw.WriteByte(MsgAsyncChunkStart);
// Write original message body with INT32 -1 for the streamed arg
switch (message)
{
case InvocationMessage m:
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamInvocationMessage m:
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamItemMessage m:
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
case CompletionMessage m:
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(1); // hasResult = true
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
}
chunkStartPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload);
_logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload);
}
SyncFlush(pipeWriter.FlushAsync());
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null)
{
dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
}
// --- CHUNK_END [202] ---
var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1);
SyncFlush(pipeWriter.FlushAsync());
_logger?.LogTrace("WriteMessageChunked CHUNK_END written");
// Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1)
// Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data
var chunkSize = _options.BufferWriterChunkSize;
var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0;
var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1;
if (_logger != null)
{
_logger.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}",
dataBytes, chunkCount, totalSentSize);
}
}
/// <summary>
/// Writes arguments for CHUNK_START: all args normally except the streamed one (INT32 -1 marker).
/// </summary>
private void WriteArgumentsChunked(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output,
object?[] arguments, int streamedArgIndex, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
{
if (i == streamedArgIndex)
{
bw.WriteRaw(-1); // streamed arg placeholder
continue;
}
WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
}
#endregion
#region TryParseMessage
public virtual bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
// AsyncSegment chunk mode
if (_chunkStates.TryGetValue(binder, out var chunkState))
{
// Guard against buffer re-presentation: if SignalR re-submitted the same buffer
// (because our previous fallthrough returned false without advancing),
// the buffer may still contain:
// 1. The already-processed CHUNK_START frame
// 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously)
// Skip both to avoid duplicate writes to state.Buffer.
if (TrySkipRepresentedChunkStart(ref input))
{
_logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length);
// Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START)
if (chunkState.ChunkFrameBytesConsumed > 0)
{
if (input.Length < chunkState.ChunkFrameBytesConsumed)
{
_logger?.LogWarning("TryParseMessage re-presentation inconsistency: expected >= {Expected} already-consumed bytes but only {Actual} in buffer",
chunkState.ChunkFrameBytesConsumed, input.Length);
return false;
}
input = input.Slice(chunkState.ChunkFrameBytesConsumed);
_logger?.LogDebug("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}",
chunkState.ChunkFrameBytesConsumed, input.Length);
}
}
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}",
binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0);
return TryParseChunkData(ref input, chunkState, binder, out message);
}
// Normal path
var reader = new SequenceReader<byte>(input);
if (!reader.TryReadLittleEndian(out int payloadLength))
return false;
if (reader.Remaining < payloadLength)
return false;
_logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length);
if (_logger != null)
{
_logger.LogInformation("Deserialize start");
}
message = ParseMessage(ref reader, payloadLength, binder);
if (message != null)
{
input = input.Slice(LengthPrefixSize + payloadLength);
if (_logger != null)
{
_logger.LogInformation("Deserialize end");
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
}
return true;
}
// CHUNK_START consumed but no complete HubMessage yet (chunk mode just activated).
// Try to process any remaining chunk data already in the buffer.
if (_chunkStates.TryGetValue(binder, out chunkState))
{
var afterChunkStart = input.Slice(LengthPrefixSize + payloadLength);
if (TryParseChunkData(ref afterChunkStart, chunkState, binder, out message))
{
// Full chunked message processed in one call
input = afterChunkStart;
_logger?.LogDebug("TryParseMessage CHUNK_START + chunk data processed in single call");
return true;
}
// IMPORTANT: do NOT advance input when returning false.
// SignalR's contract is "advance only on success". If we advance here,
// the buffer state becomes inconsistent on re-submission.
// On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode
// block above handles that via TrySkipRepresentedChunkStart.
_logger?.LogDebug("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)");
return false;
}
return false;
}
/// <summary>
/// Detects if the buffer starts with a re-presented CHUNK_START frame pattern
/// ([INT32 length][CHUNK_START marker]). If so, advances <paramref name="input"/>
/// past the entire frame and returns true.
///
/// This guards against the case where SignalR's buffer management re-presents
/// bytes we logically consumed during a previous false-returning TryParseMessage call.
/// </summary>
private static bool TrySkipRepresentedChunkStart(ref ReadOnlySequence<byte> input)
{
if (input.Length < LengthPrefixSize + 1) return false;
Span<byte> header = stackalloc byte[LengthPrefixSize + 1];
input.Slice(0, LengthPrefixSize + 1).CopyTo(header);
int maybeLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32LittleEndian(header.Slice(0, LengthPrefixSize));
byte maybeMarker = header[LengthPrefixSize];
if (maybeMarker != MsgAsyncChunkStart) return false;
if (maybeLen <= 0 || input.Length < LengthPrefixSize + maybeLen) return false;
input = input.Slice(LengthPrefixSize + maybeLen);
return true;
}
private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder)
{
if (payloadLength == 0)
return null;
// Mark end position so Parse* methods can check Remaining relative to payload
var payloadEnd = r.Consumed + payloadLength;
r.TryRead(out var msgType);
return msgType switch
{
MsgInvocation => ParseInvocation(ref r, binder),
MsgStreamInvocation => ParseStreamInvocation(ref r, binder),
MsgStreamItem => ParseStreamItem(ref r, binder),
MsgCompletion => ParseCompletion(ref r, binder),
MsgCancelInvocation => ParseCancelInvocation(ref r),
MsgPing => PingMessage.Instance,
MsgClose => ParseClose(ref r),
MsgAck => new AckMessage(ReadInt64(ref r)),
MsgSequence => new SequenceMessage(ReadInt64(ref r)),
MsgAsyncChunkStart => ParseAsyncChunkStart(ref r, binder),
_ => null
};
}
/// <summary>
/// Legacy diagnostic logger. Use ILogger via constructor instead.
/// </summary>
[Obsolete("Use ILogger via constructor parameter instead. This property will be removed in a future version.")]
public static Action<string>? DiagnosticLogger { get; set; }
[Conditional("DEBUG")]
private void LogDiagnostic(string message) => _logger?.LogDebug(message);
[Conditional("DEBUG")]
private void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var segmentCount = 0;
foreach (var _ in argSlice) segmentCount++;
_logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}",
argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name);
}
[Conditional("DEBUG")]
private void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
_logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}",
target, paramTypes.Count, string.Join(", ", typeNames), remaining);
}
private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadNullableString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
LogParseInvocation(target, paramTypes, r.Remaining);
var args = ReadArguments(ref r, paramTypes);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
var args = ReadArguments(ref r, paramTypes);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = new StreamInvocationMessage(invocationId, target, args, streamIds);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamItem(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var itemType = binder.GetStreamItemType(invocationId);
var item = ReadSingleArgument(ref r, itemType);
var headers = ReadHeaders(ref r);
var msg = new StreamItemMessage(invocationId, item);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseCompletion(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r);
r.TryRead(out var hasResultByte);
var hasResult = hasResultByte == 1;
object? result = null;
if (hasResult)
{
var resultType = binder.GetReturnType(invocationId);
result = ReadSingleArgument(ref r, resultType);
}
var headers = ReadHeaders(ref r);
CompletionMessage msg;
if (error != null) msg = CompletionMessage.WithError(invocationId, error);
else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result);
else msg = CompletionMessage.Empty(invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseCancelInvocation(ref SequenceReader<byte> r)
{
var invocationId = ReadString(ref r);
var headers = ReadHeaders(ref r);
var msg = new CancelInvocationMessage(invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseClose(ref SequenceReader<byte> r)
{
var error = ReadNullableString(ref r);
r.TryRead(out var reconnectByte);
var allowReconnect = reconnectByte == 1;
return new CloseMessage(error, allowReconnect);
}
#endregion
#region Chunked Protocol (AsyncSegment read)
/// <summary>
/// Processes CHUNK_DATA and CHUNK_END in chunk accumulation mode.
/// Called from TryParseMessage when an active AsyncChunkState exists for this connection.
/// Loops over all available chunks — critical because SignalR's while loop exits when
/// TryParseMessage returns false, and won't re-enter until new data arrives on the pipe.
/// </summary>
private bool TryParseChunkData(ref ReadOnlySequence<byte> input, AsyncChunkState state,
IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
while (input.Length >= 1)
{
var firstByte = input.FirstSpan[0];
if (firstByte == MsgAsyncChunkData) // 201 — self-describing data chunk [201][UINT16 size][data]
{
// Need at least [201][UINT16]
if (input.Length < 3) return false;
// Read UINT16 chunk data size
var headerSlice = input.Slice(1, 2);
Span<byte> sizeBytes = stackalloc byte[2];
headerSlice.CopyTo(sizeBytes);
var chunkDataSize = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(sizeBytes);
var totalNeeded = 3 + chunkDataSize; // header (3) + data
if (input.Length < totalNeeded) return false;
_logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
// Write chunk data to SegmentBufferReader for background deserialization
if (chunkDataSize > 0)
{
var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice)
state.Buffer.Write(segment.Span);
}
// Lazy start: begin background deserialization after first chunk is written.
// SegmentBufferReaderInput.Initialize reads the already-written data immediately.
// Browser fallback: skip Task.Run — SegmentBufferReader.WaitForData relies on
// ManualResetEventSlim.Wait which throws PlatformNotSupportedException on WASM.
// Instead, buffer all chunks and run the deserializer synchronously on CHUNK_END,
// where state.Buffer.Complete() has already been called and no wait is needed.
if (state.DeserTask == null && !IsBrowser)
{
_logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
var reader = state.Buffer;
var type = state.StreamedArgType;
var opts = _options;
state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts));
}
input = input.Slice(totalNeeded);
state.ChunkFrameBytesConsumed += totalNeeded;
continue; // try next chunk immediately
}
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{
_logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion");
// Signal end of data → background deser task completes
state.Buffer.Complete();
object? deserializedArg = null;
try
{
if (state.DeserTask != null)
{
// Desktop / server: background task has been deserializing concurrently
// with chunk arrival (pipeline parallelism). Wait for its result here.
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
}
else
{
// Browser (WASM) fallback: all chunks are buffered, state.Buffer.Complete()
// has been called above, so the synchronous deserializer reads through the
// completed buffer without any Monitor.Wait.
deserializedArg = AcBinaryDeserializer.Deserialize(
state.Buffer, state.StreamedArgType, _options);
}
if (_logger != null)
{
_logger.LogInformation("Deserialize end (chunked)");
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "TryParseChunkData deserialization FAILED targetType={TargetType}", state.StreamedArgType.Name);
throw;
}
finally
{
_logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove");
state.Buffer.Dispose();
_chunkStates.Remove(binder);
}
// Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg);
input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage;
return true;
}
// Unknown byte in chunk mode — break out (shouldn't happen)
_logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " +
"binderHash={BinderHash} inputLength={InputLength} " +
"state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}",
firstByte,
binder.GetHashCode(),
input.Length,
state.StreamedArgType.Name,
state.DeserTask?.Status.ToString() ?? "null",
state.Buffer.WritePos,
state.Buffer.ReadPos);
break;
}
return false;
}
/// <summary>
/// Parses CHUNK_START: reads original message (with -1 marker for streamed arg),
/// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk.
/// Returns null to signal "consumed bytes, no complete message yet".
/// </summary>
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
{
r.TryRead(out var originalMsgType);
_logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType);
// Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments
var partialMessage = originalMsgType switch
{
MsgInvocation => ParseInvocation(ref r, binder),
MsgStreamInvocation => ParseStreamInvocation(ref r, binder),
MsgStreamItem => ParseStreamItem(ref r, binder),
MsgCompletion => ParseCompletion(ref r, binder),
_ => null
};
if (partialMessage == null) return null;
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
// Derived classes can override ResolveStreamedArgType to consult _currentHeaderContext
// (set by ReadHeader) or any other per-message state.
streamedType = ResolveStreamedArgType(streamedType);
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
streamedIndex, streamedType.Name);
var state = new AsyncChunkState
{
PartialMessage = partialMessage,
Args = args,
StreamedArgIndex = streamedIndex,
StreamedArgType = streamedType,
Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger)
// DeserTask started lazily in TryParseChunkData after first chunk is written
};
_chunkStates.AddOrUpdate(binder, state);
_logger?.LogDebug("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}",
binder.GetHashCode(), streamedType.Name);
return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData
}
/// <summary>
/// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array,
/// placeholder index, and the target deserialization type.
/// </summary>
private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder)
{
switch (message)
{
case InvocationMessage inv:
{
var paramTypes = binder.GetParameterTypes(inv.Target);
for (var i = 0; i < inv.Arguments.Length; i++)
{
if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (inv.Arguments, i, type);
}
break;
}
case StreamInvocationMessage sinv:
{
var paramTypes = binder.GetParameterTypes(sinv.Target);
for (var i = 0; i < sinv.Arguments.Length; i++)
{
if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (sinv.Arguments, i, type);
}
break;
}
case StreamItemMessage si:
{
if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
{
// StreamItemMessage.Item is read-only, use a wrapper array
var args = new object?[] { si.Item };
var type = binder.GetStreamItemType(si.InvocationId!);
return (args, 0, type);
}
break;
}
case CompletionMessage comp:
{
if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
{
var args = new object?[] { comp.Result };
var type = binder.GetReturnType(comp.InvocationId!);
return (args, 0, type);
}
break;
}
}
return ([], -1, typeof(object));
}
/// <summary>
/// Replaces the StreamedArgPlaceholder with the deserialized value in the stored message.
/// </summary>
private static void FillStreamedArg(AsyncChunkState state, object? deserializedValue)
{
if (state.StreamedArgIndex < 0) return;
switch (state.PartialMessage)
{
case InvocationMessage inv:
inv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamInvocationMessage sinv:
sinv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamItemMessage:
// StreamItemMessage.Item has no public setter — need to create a new message
if (state.PartialMessage is StreamItemMessage si) state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue);
break;
case CompletionMessage:
// CompletionMessage.Result has no public setter — need to create a new message
if (state.PartialMessage is CompletionMessage comp) state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue);
break;
}
}
#endregion
#region Argument Serialization
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
{
// byte[] fast-path: size known upfront, write entirely through BWO
if (value is byte[] byteArray)
{
var isAcBinary = byteArray.Length >= 2
&& byteArray[0] == AcBinarySerializerOptions.FormatVersion
&& (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase;
if (isAcBinary)
{
// Already AcBinary-serialized: write raw length + bytes, no tag wrapper
bw.WriteRaw(byteArray.Length);
}
else
{
// Raw byte[] (image, file, etc.): tag + raw bytes, no VarUInt (argLength implies size)
bw.WriteRaw(1 + byteArray.Length);
bw.WriteByte(BinaryTypeCode.ByteArray);
}
bw.WriteBytes(byteArray);
return;
}
// Bytes mode: serialize to byte[], write through BWO (no FlushAndReset needed)
if (_protocolMode == BinaryProtocolMode.Bytes)
{
var serialized = AcBinarySerializer.Serialize(value, _options);
bw.WriteRaw(serialized.Length);
bw.WriteBytes(serialized);
return;
}
// Segment mode: serialize directly to the pipe via BufferWriterBinaryOutput
// (AsyncSegment goes through WriteMessageChunked, never reaches here)
bw.FlushAndReset();
// Reserve arg length prefix directly on the pipe
var argLenSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var argBytes = AcBinarySerializer.Serialize(value, output, _options);
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
externalBytes += LengthPrefixSize + argBytes;
}
private object?[] ReadArguments(ref SequenceReader<byte> r, IReadOnlyList<Type> paramTypes)
{
var count = (int)ReadVarUInt(ref r);
LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}");
var args = new object?[count];
for (var i = 0; i < count; i++)
{
var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object);
LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}");
args[i] = ReadSingleArgument(ref r, targetType);
OnArgumentRead(args[i], i);
}
return args;
}
protected virtual void OnArgumentRead(object? value, int index) { }
/// <summary>
/// Override to resolve typeof(object) to a concrete type (e.g., from SignalParams).
/// Called after FindStreamedArgSlot in chunked deserialization.
/// </summary>
protected virtual Type ResolveStreamedArgType(Type binderType) => binderType;
/// <summary>
/// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer.
/// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[].
/// SignalDataType enables eager deserialization of response data to the server's actual type.
/// </summary>
protected virtual object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
{
r.TryReadLittleEndian(out int argLength);
if (argLength == 0) return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1)
{
_logger?.LogTrace("ReadSingleArgument streamed arg marker (-1) → placeholder");
return StreamedArgPlaceholder;
}
// Null marker check
if (argLength == 1)
{
r.TryPeek(out var marker);
if (marker == 0) { r.Advance(1); return null; }
}
// Slice argument from pipe sequence — zero-copy reference
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
LogReadSingleArgument(argSlice, argLength, targetType);
// byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag →
// strip tag, rest is raw payload. No VarUInt length (argLength implies size).
var argReader = new SequenceReader<byte>(argSlice);
if (argReader.TryPeek(out var tag) && tag == BinaryTypeCode.ByteArray)
{
return SequenceToByteArray(argSlice.Slice(1));
}
// Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead)
if (_protocolMode == BinaryProtocolMode.Bytes)
{
var bytes = SequenceToByteArray(argSlice);
return AcBinaryDeserializer.Deserialize(bytes, targetType, _options);
}
return DeserializeFromSequence(argSlice, targetType, _options);
}
/// <summary>
/// Returns raw byte[] from the pipe sequence without any deserialization.
/// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
{
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length)
return seg.Array;
return data.ToArray();
}
/// <summary>
/// Deserializes from a ReadOnlySequence via AcBinaryDeserializer.
/// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options)
=> AcBinaryDeserializer.Deserialize(data, targetType, options);
#endregion
#region Write Framing Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
{
if (value == null)
{
bw.WriteByte(0);
return;
}
bw.WriteByte(1);
bw.WriteStringUtf8(value);
}
private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array)
{
if (array == null || array.Length == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)array.Length);
for (var i = 0; i < array.Length; i++)
bw.WriteStringUtf8(array[i]);
}
private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary<string, string>? headers)
{
if (headers == null || headers.Count == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)headers.Count);
foreach (var kv in headers)
{
bw.WriteStringUtf8(kv.Key);
bw.WriteStringUtf8(kv.Value);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int VarUIntSize(uint value)
{
if (value < 0x80) return 1;
if (value < 0x4000) return 2;
if (value < 0x200000) return 3;
if (value < 0x10000000) return 4;
return 5;
}
#endregion
#region Sequence Read Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long ReadInt64(ref SequenceReader<byte> r)
{
r.TryReadLittleEndian(out long v);
return v;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static uint ReadVarUInt(ref SequenceReader<byte> r)
{
uint value = 0;
var shift = 0;
while (r.TryRead(out var b))
{
value |= (uint)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
return value;
shift += 7;
}
return value;
}
protected static string ReadString(ref SequenceReader<byte> r)
{
var byteCount = (int)ReadVarUInt(ref r);
if (byteCount == 0)
return string.Empty;
r.TryReadExact(byteCount, out var bytes);
return bytes.IsSingleSegment
? Encoding.UTF8.GetString(bytes.FirstSpan)
: Encoding.UTF8.GetString(bytes.ToArray());
}
protected static string? ReadNullableString(ref SequenceReader<byte> r)
{
r.TryRead(out var marker);
return marker == 0 ? null : ReadString(ref r);
}
private static string[]? ReadStringArray(ref SequenceReader<byte> r)
{
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var array = new string[count];
for (var i = 0; i < count; i++)
array[i] = ReadString(ref r);
return array;
}
private static Dictionary<string, string>? ReadHeaders(ref SequenceReader<byte> r)
{
if (r.Remaining == 0)
return null;
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var headers = new Dictionary<string, string>(count, StringComparer.Ordinal);
for (var i = 0; i < count; i++)
{
var key = ReadString(ref r);
var value = ReadString(ref r);
headers[key] = value;
}
return headers;
}
#endregion
#region Helpers
private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId)
{
if (invocationId != null)
return new InvocationMessage(invocationId, msg.Target, msg.Arguments);
return msg;
}
private static void SetHeaders(HubMessage msg, Dictionary<string, string> headers)
{
if (msg is HubInvocationMessage invMsg)
invMsg.Headers = headers;
}
#endregion
}