AyCode.Core/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs

1440 lines
59 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs;
/// <summary>
/// Custom SignalR hub protocol using AcBinarySerializer for wire format.
/// Eliminates JSON+Base64 overhead by serializing all HubMessages directly to binary.
///
/// Wire format per message:
/// [4 bytes: payload length (little-endian)] [payload bytes]
///
/// Payload structure:
/// [1 byte: message type] [message-specific fields serialized via AcBinary]
///
/// Message types map 1:1 to SignalR HubMessageType values.
/// Arguments are serialized individually with an INT32 length prefix each,
/// enabling deferred deserialization via IHubProtocol's binder pattern.
///
/// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path.
/// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write).
///
/// Read path: SequenceReader&lt;byte&gt; reads directly from the pipe's ReadOnlySequence.
/// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read).
/// </summary>
public class AcBinaryHubProtocol : IHubProtocol
{
private const int LengthPrefixSize = 4;
// Message type markers (matching HubMessageType enum values)
private const byte MsgInvocation = 1;
private const byte MsgStreamItem = 2;
private const byte MsgCompletion = 3;
private const byte MsgStreamInvocation = 4;
private const byte MsgCancelInvocation = 5;
private const byte MsgPing = 6;
private const byte MsgClose = 7;
private const byte MsgAck = 8;
private const byte MsgSequence = 9;
// Chunked protocol framing for AsyncSegment mode
private const byte MsgAsyncChunkStart = 200;
private const byte MsgAsyncChunkData = 201;
private const byte MsgAsyncChunkEnd = 202;
/// <summary>Sentinel object placed in the args array for the streamed argument (replaced after chunk deserialization).</summary>
protected static readonly object StreamedArgPlaceholder = new();
/// <summary>
/// True when running on a browser (WebAssembly) runtime. Cached at type-load because
/// the value is invariant per process and the check is used on hot paths.
/// <para>
/// Browser implications:
/// <list type="bullet">
/// <item>Send path: <c>AsyncSegment</c> is unsupported (sync-over-async flush blocks the single UI thread).</item>
/// <item>Receive path: when chunked wire arrives, background <c>Task.Run</c> is skipped;
/// the deserializer runs synchronously on <c>CHUNK_END</c> over the already-buffered data
/// (<see cref="SegmentBufferReader"/>'s <c>ManualResetEventSlim.Wait()</c> would throw
/// <see cref="PlatformNotSupportedException"/>).</item>
/// </list>
/// </para>
/// </summary>
private static readonly bool IsBrowser = OperatingSystem.IsBrowser();
protected volatile AcBinarySerializerOptions _options;
protected readonly BinaryProtocolMode _protocolMode;
protected readonly ILogger? _logger;
/// <summary>
/// AsyncSegment per-chunk flush synchronization — see <see cref="AcBinaryHubProtocolOptions.FlushPolicy"/>.
/// </summary>
protected readonly FlushPolicy _flushPolicy;
/// <summary>
/// Per-flush wait limit — see <see cref="AcBinaryHubProtocolOptions.FlushTimeout"/>.
/// Guaranteed positive or <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> by <see cref="AcBinaryHubProtocolOptions.Validate"/>.
/// </summary>
protected readonly TimeSpan _flushTimeout;
/// <summary>
/// Per-connection chunk accumulation state. Key is IInvocationBinder (per-connection, GC-friendly).
/// Always initialized regardless of ProtocolMode — any client can receive chunked data from an AsyncSegment server.
/// </summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState> _chunkStates;
private sealed class AsyncChunkState
{
public HubMessage PartialMessage = null!;
public object?[] Args = null!;
public int StreamedArgIndex;
public Type StreamedArgType = null!;
public SegmentBufferReader Buffer = null!;
public Task<object?>? DeserTask;
/// <summary>
/// Per-binder header context — the opaque object returned by <see cref="ReadHeader"/> for
/// the currently chunked message. Persisted across CHUNK_START → CHUNK_DATA × N → CHUNK_END
/// boundaries inside the per-binder <see cref="AsyncChunkState"/> entry, so derived classes
/// can consume it during chunked deserialization without sharing state across connections.
/// </summary>
public object? HeaderContext;
/// <summary>
/// Total bytes of chunk frame data already consumed from the input stream
/// (including [201][UINT16] framing headers + data bytes).
/// Used to skip already-processed chunks when SignalR re-presents the buffer
/// after a false-returning TryParseMessage call.
/// </summary>
public int ChunkFrameBytesConsumed;
}
/// <summary>
/// Parameterless constructor — creates the protocol with all-default options
/// (<see cref="BinaryProtocolMode.Bytes"/>, 4 KB buffer, 10 s flush timeout, "acbinary" name).
/// Mainly for tests and simple scenarios. For production, pass an explicit
/// <see cref="AcBinaryHubProtocolOptions"/> or configure via DI.
/// </summary>
public AcBinaryHubProtocol() : this(new AcBinaryHubProtocolOptions()) { }
/// <summary>
/// Primary constructor. All configuration flows through <see cref="AcBinaryHubProtocolOptions"/>.
/// Invalid configuration (incl. WebAssembly + AsyncSegment send-path) throws from
/// <see cref="AcBinaryHubProtocolOptions.Validate"/>.
/// </summary>
public AcBinaryHubProtocol(AcBinaryHubProtocolOptions options)
{
if (options is null) throw new ArgumentNullException(nameof(options));
options.Validate();
_options = options.SerializerOptions;
_options.BufferWriterChunkSize = options.BufferSize;
_protocolMode = options.ProtocolMode;
_logger = options.Logger;
_flushPolicy = options.FlushPolicy;
_flushTimeout = options.FlushTimeout;
Name = options.Name;
_chunkStates = new ConditionalWeakTable<IInvocationBinder, AsyncChunkState>();
_logger?.LogInformation(
"AcBinaryHubProtocol initialized name={Name} mode={ProtocolMode} isBrowser={IsBrowser} chunkSize={ChunkSize} initCap={InitCap} flushPolicy={FlushPolicy} flushTimeoutMs={FlushTimeoutMs} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}",
Name, _protocolMode, IsBrowser, _options.BufferWriterChunkSize, _options.InitialBufferCapacity,
_flushPolicy, _flushTimeout.TotalMilliseconds,
_options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression);
}
/// <summary>
/// Runtime-replaceable serializer options.
/// Thread-safe: uses volatile field, callers see the new options on next message.
/// </summary>
public AcBinarySerializerOptions Options
{
get => _options;
set => _options = value;
}
/// <summary>Protocol name sent in SignalR handshake. Set via <see cref="AcBinaryHubProtocolOptions.Name"/>. Default: <c>"acbinary"</c>.</summary>
public string Name { get; } = "acbinary";
public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary;
/// <summary>
/// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask.
/// Fast-path: if already completed (no backpressure), returns directly without Task allocation.
/// Slow-path: blocks with <see cref="_flushTimeout"/> — throws <see cref="TimeoutException"/>
/// if the flush does not complete within the timeout (protects against slow/stuck/disconnected
/// consumers holding the server thread indefinitely).
/// <para>
/// <see cref="AcBinaryHubProtocolOptions.Validate"/> guarantees <c>_flushTimeout</c> is either
/// positive or <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> (which <c>Task.Wait</c>
/// natively treats as "wait forever"), so no explicit zero-check is needed here.
/// </para>
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private FlushResult SyncFlush(ValueTask<FlushResult> vt)
{
if (vt.IsCompletedSuccessfully) return vt.Result;
var task = vt.AsTask();
return task.Wait(_flushTimeout)
? task.GetAwaiter().GetResult()
: throw new TimeoutException($"PipeWriter.FlushAsync exceeded {_flushTimeout.TotalSeconds:F1}s — " + "consumer may be too slow, stuck, or disconnected.");
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
#region Extensibility Hooks
/// <summary>
/// Called right after the message type byte (both chunked and non-chunked paths).
/// Derived protocols can write extra header fields here (e.g., a type AQN for untyped args).
/// <para>
/// Default implementation writes nothing — base protocol is fully generic and has no per-message
/// extra state. Derived classes <b>must</b> read exactly the same bytes in <see cref="ReadHeader"/>.
/// </para>
/// </summary>
/// <param name="bw">Output writer (same one used for the message payload).</param>
/// <param name="message">The message being written.</param>
/// <param name="streamedArg">
/// When the chunked path activates, this is the actual argument being streamed (so the derived
/// class can use its concrete runtime type). <c>null</c> for non-chunked messages.
/// </param>
protected virtual void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
// Base: no extra header.
}
/// <summary>
/// Reads the per-message header written by <see cref="WriteHeader"/> on the sender side.
/// Called right after the message type byte has been consumed.
/// <para>
/// Returns an opaque context object that is stored in <see cref="_currentHeaderContext"/>
/// for derived classes to consume during the rest of the parse.
/// Default implementation returns <c>null</c>.
/// </para>
/// </summary>
protected virtual object? ReadHeader(ref SequenceReader<byte> r) => null;
#endregion
#region WriteMessage
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
{
// +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory,
// which would invalidate the length prefix span obtained before Advance.
var writer = new ArrayBufferWriter<byte>(_options.BufferWriterChunkSize + LengthPrefixSize);
WriteMessage(message, writer);
return writer.WrittenMemory;
}
public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{
_logger?.LogInformation("Serialize start");
// AsyncSegment: chunked protocol framing for messages with streamable arguments
if (_protocolMode == BinaryProtocolMode.AsyncSegment
&& output is PipeWriter pipeWriter
&& HasStreamableArgs(message))
{
WriteMessageChunked(message, pipeWriter);
return;
}
// Reserve outer length prefix directly on the pipe (before BWO takes over)
var lengthSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
var externalBytes = 0;
switch (message)
{
case InvocationMessage m:
WriteInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamInvocationMessage m:
WriteStreamInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamItemMessage m:
WriteStreamItem(ref bw, output, m, ref externalBytes);
break;
case CompletionMessage m:
WriteCompletion(ref bw, output, m, ref externalBytes);
break;
case CancelInvocationMessage m:
WriteCancelInvocation(ref bw, m);
break;
case PingMessage:
bw.WriteByte(MsgPing);
break;
case CloseMessage m:
WriteClose(ref bw, m);
break;
case AckMessage m:
bw.WriteByte(MsgAck);
bw.WriteRaw(m.SequenceId);
break;
case SequenceMessage m:
bw.WriteByte(MsgSequence);
bw.WriteRaw(m.SequenceId);
break;
default:
throw new HubException($"Unexpected message type: {message.GetType().Name}");
}
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
_logger?.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload);
if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload);
}
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg: null);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamInvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamItemMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteArgument(ref bw, output, m.Item, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, CompletionMessage m, ref int externalBytes)
{
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
var hasResult = m.HasResult;
bw.WriteByte(hasResult ? (byte)1 : (byte)0);
if (hasResult)
WriteArgument(ref bw, output, m.Result, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m)
{
bw.WriteByte(MsgCancelInvocation);
bw.WriteStringUtf8(m.InvocationId!);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m)
{
bw.WriteByte(MsgClose);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0);
}
#endregion
#region Chunked Protocol (AsyncSegment write)
/// <summary>
/// Returns true if the message has arguments that should be streamed via chunked protocol.
/// Only non-null, non-byte[] arguments go through the chunked path.
/// </summary>
private static bool HasStreamableArgs(HubMessage message) => message switch
{
InvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => HasNonByteArrayArg(m.Arguments),
StreamItemMessage m => m.Item != null && m.Item is not byte[],
CompletionMessage m => m.HasResult && m.Result != null && m.Result is not byte[],
_ => false
};
private static bool HasNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return true;
}
return false;
}
/// <summary>
/// Gets the last non-null, non-byte[] argument value and its index for streaming.
/// </summary>
private static (object? value, int index) GetStreamedArg(HubMessage message) => message switch
{
InvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamInvocationMessage m => GetLastNonByteArrayArg(m.Arguments),
StreamItemMessage m => (m.Item, 0),
CompletionMessage m => (m.Result, 0),
_ => (null, -1)
};
private static (object? value, int index) GetLastNonByteArrayArg(object?[] args)
{
for (var i = args.Length - 1; i >= 0; i--)
{
if (args[i] != null && args[i] is not byte[])
return (args[i], i);
}
return (null, -1);
}
/// <summary>
/// Writes a message using chunked protocol framing for AsyncSegment mode.
/// CHUNK_START: standard SignalR framed message with INT32 -1 for the streamed arg.
/// CHUNK_DATA: [201][UINT16 size][data] per chunk (written by AsyncPipeWriterOutput, zero-copy).
/// CHUNK_END: [202] (1 byte, no data — all data already committed by output).
/// </summary>
private void WriteMessageChunked(HubMessage message, PipeWriter pipeWriter)
{
var (streamedArg, streamedArgIndex) = GetStreamedArg(message);
if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}",
message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null");
int chunkStartPayload;
var dataBytes = 0;
// --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) ---
{
var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize);
pipeWriter.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize);
var externalBytes = 0;
bw.WriteByte(MsgAsyncChunkStart);
// Write original message body with INT32 -1 for the streamed arg
switch (message)
{
case InvocationMessage m:
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamInvocationMessage m:
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
break;
case StreamItemMessage m:
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
case CompletionMessage m:
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(1); // hasResult = true
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
break;
}
chunkStartPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload);
_logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload);
}
SyncFlush(pipeWriter.FlushAsync());
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null)
{
dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options, _flushPolicy, _flushTimeout);
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
}
// --- CHUNK_END [202] ---
var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1);
SyncFlush(pipeWriter.FlushAsync());
_logger?.LogTrace("WriteMessageChunked CHUNK_END written");
// Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1)
// Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data
var chunkSize = _options.BufferWriterChunkSize;
var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0;
var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1;
_logger?.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}",
dataBytes, chunkCount, totalSentSize);
}
/// <summary>
/// Writes arguments for CHUNK_START: all args normally except the streamed one (INT32 -1 marker).
/// </summary>
private void WriteArgumentsChunked(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output,
object?[] arguments, int streamedArgIndex, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
{
if (i == streamedArgIndex)
{
bw.WriteRaw(-1); // streamed arg placeholder
continue;
}
WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
}
#endregion
#region TryParseMessage
public virtual bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
// AsyncSegment chunk mode
if (_chunkStates.TryGetValue(binder, out var chunkState))
{
// Guard against buffer re-presentation: if SignalR re-submitted the same buffer
// (because our previous fallthrough returned false without advancing),
// the buffer may still contain:
// 1. The already-processed CHUNK_START frame
// 2. Already-processed CHUNK_DATA frames (if we processed any partial chunks previously)
// Skip both to avoid duplicate writes to state.Buffer.
if (TrySkipRepresentedChunkStart(ref input))
{
_logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length);
// Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START)
if (chunkState.ChunkFrameBytesConsumed > 0)
{
if (input.Length < chunkState.ChunkFrameBytesConsumed)
{
_logger?.LogWarning("TryParseMessage re-presentation inconsistency: expected >= {Expected} already-consumed bytes but only {Actual} in buffer",
chunkState.ChunkFrameBytesConsumed, input.Length);
return false;
}
input = input.Slice(chunkState.ChunkFrameBytesConsumed);
_logger?.LogDebug("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}",
chunkState.ChunkFrameBytesConsumed, input.Length);
}
}
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}",
binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0);
return TryParseChunkData(ref input, chunkState, binder, out message);
}
// Normal path
var reader = new SequenceReader<byte>(input);
if (!reader.TryReadLittleEndian(out int payloadLength))
return false;
if (reader.Remaining < payloadLength)
return false;
_logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length);
_logger?.LogInformation("Deserialize start");
message = ParseMessage(ref reader, payloadLength, binder);
if (message != null)
{
input = input.Slice(LengthPrefixSize + payloadLength);
_logger?.LogInformation("Deserialize end");
if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
return true;
}
// CHUNK_START consumed but no complete HubMessage yet (chunk mode just activated).
// Try to process any remaining chunk data already in the buffer.
if (_chunkStates.TryGetValue(binder, out chunkState))
{
var afterChunkStart = input.Slice(LengthPrefixSize + payloadLength);
if (TryParseChunkData(ref afterChunkStart, chunkState, binder, out message))
{
// Full chunked message processed in one call
input = afterChunkStart;
_logger?.LogDebug("TryParseMessage CHUNK_START + chunk data processed in single call");
return true;
}
// IMPORTANT: do NOT advance input when returning false.
// SignalR's contract is "advance only on success". If we advance here,
// the buffer state becomes inconsistent on re-submission.
// On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode
// block above handles that via TrySkipRepresentedChunkStart.
_logger?.LogDebug("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)");
return false;
}
return false;
}
/// <summary>
/// Detects if the buffer starts with a re-presented CHUNK_START frame pattern
/// ([INT32 length][CHUNK_START marker]). If so, advances <paramref name="input"/>
/// past the entire frame and returns true.
///
/// This guards against the case where SignalR's buffer management re-presents
/// bytes we logically consumed during a previous false-returning TryParseMessage call.
/// </summary>
private static bool TrySkipRepresentedChunkStart(ref ReadOnlySequence<byte> input)
{
if (input.Length < LengthPrefixSize + 1) return false;
Span<byte> header = stackalloc byte[LengthPrefixSize + 1];
input.Slice(0, LengthPrefixSize + 1).CopyTo(header);
int maybeLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32LittleEndian(header.Slice(0, LengthPrefixSize));
byte maybeMarker = header[LengthPrefixSize];
if (maybeMarker != MsgAsyncChunkStart) return false;
if (maybeLen <= 0 || input.Length < LengthPrefixSize + maybeLen) return false;
input = input.Slice(LengthPrefixSize + maybeLen);
return true;
}
private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder)
{
if (payloadLength == 0)
return null;
// Mark end position so Parse* methods can check Remaining relative to payload
var payloadEnd = r.Consumed + payloadLength;
r.TryRead(out var msgType);
// The header context (out _) is intentionally discarded on the non-chunked path —
// it lives only on the stack frame of the Parse* call and is consumed inline by the
// ReadArguments / ReadSingleArgument calls inside that frame. No instance state means
// no race even when this protocol instance is shared across threads (NuGet contract).
return msgType switch
{
MsgInvocation => ParseInvocation(ref r, binder, out _),
MsgStreamInvocation => ParseStreamInvocation(ref r, binder, out _),
MsgStreamItem => ParseStreamItem(ref r, binder, out _),
MsgCompletion => ParseCompletion(ref r, binder, out _),
MsgCancelInvocation => ParseCancelInvocation(ref r),
MsgPing => PingMessage.Instance,
MsgClose => ParseClose(ref r),
MsgAck => new AckMessage(ReadInt64(ref r)),
MsgSequence => new SequenceMessage(ReadInt64(ref r)),
MsgAsyncChunkStart => ParseAsyncChunkStart(ref r, binder),
_ => null
};
}
/// <summary>
/// Legacy diagnostic logger. Use ILogger via constructor instead.
/// </summary>
[Obsolete("Use ILogger via constructor parameter instead. This property will be removed in a future version.")]
public static Action<string>? DiagnosticLogger { get; set; }
[Conditional("DEBUG")]
private void LogDiagnostic(string message) => _logger?.LogDebug(message);
[Conditional("DEBUG")]
private void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var segmentCount = 0;
foreach (var _ in argSlice) segmentCount++;
_logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}",
argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name);
}
[Conditional("DEBUG")]
private void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
_logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}",
target, paramTypes.Count, string.Join(", ", typeNames), remaining);
}
private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder, out object? headerContext)
{
headerContext = ReadHeader(ref r);
var invocationId = ReadNullableString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
LogParseInvocation(target, paramTypes, r.Remaining);
var args = ReadArguments(ref r, paramTypes, headerContext);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamInvocation(ref SequenceReader<byte> r, IInvocationBinder binder, out object? headerContext)
{
headerContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
var args = ReadArguments(ref r, paramTypes, headerContext);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = new StreamInvocationMessage(invocationId, target, args, streamIds);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamItem(ref SequenceReader<byte> r, IInvocationBinder binder, out object? headerContext)
{
headerContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var itemType = binder.GetStreamItemType(invocationId);
var item = ReadSingleArgument(ref r, itemType, headerContext);
var headers = ReadHeaders(ref r);
var msg = new StreamItemMessage(invocationId, item);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseCompletion(ref SequenceReader<byte> r, IInvocationBinder binder, out object? headerContext)
{
headerContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r);
r.TryRead(out var hasResultByte);
var hasResult = hasResultByte == 1;
object? result = null;
if (hasResult)
{
var resultType = binder.GetReturnType(invocationId);
result = ReadSingleArgument(ref r, resultType, headerContext);
}
var headers = ReadHeaders(ref r);
CompletionMessage msg;
if (error != null) msg = CompletionMessage.WithError(invocationId, error);
else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result);
else msg = CompletionMessage.Empty(invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseCancelInvocation(ref SequenceReader<byte> r)
{
var invocationId = ReadString(ref r);
var headers = ReadHeaders(ref r);
var msg = new CancelInvocationMessage(invocationId);
if (headers != null) SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseClose(ref SequenceReader<byte> r)
{
var error = ReadNullableString(ref r);
r.TryRead(out var reconnectByte);
var allowReconnect = reconnectByte == 1;
return new CloseMessage(error, allowReconnect);
}
#endregion
#region Chunked Protocol (AsyncSegment read)
/// <summary>
/// Processes CHUNK_DATA and CHUNK_END in chunk accumulation mode.
/// Called from TryParseMessage when an active AsyncChunkState exists for this connection.
/// Loops over all available chunks — critical because SignalR's while loop exits when
/// TryParseMessage returns false, and won't re-enter until new data arrives on the pipe.
/// </summary>
private bool TryParseChunkData(ref ReadOnlySequence<byte> input, AsyncChunkState state,
IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
while (input.Length >= 1)
{
var firstByte = input.FirstSpan[0];
if (firstByte == MsgAsyncChunkData) // 201 — self-describing data chunk [201][UINT16 size][data]
{
// Need at least [201][UINT16]
if (input.Length < 3) return false;
// Read UINT16 chunk data size
var headerSlice = input.Slice(1, 2);
Span<byte> sizeBytes = stackalloc byte[2];
headerSlice.CopyTo(sizeBytes);
var chunkDataSize = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(sizeBytes);
var totalNeeded = 3 + chunkDataSize; // header (3) + data
if (input.Length < totalNeeded) return false;
_logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
// Write chunk data to SegmentBufferReader for background deserialization
if (chunkDataSize > 0)
{
var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice)
state.Buffer.Write(segment.Span);
}
// Lazy start: begin background deserialization after first chunk is written.
// SegmentBufferReaderInput.Initialize reads the already-written data immediately.
// Browser fallback: skip Task.Run — SegmentBufferReader.WaitForData relies on
// ManualResetEventSlim.Wait which throws PlatformNotSupportedException on WASM.
// Instead, buffer all chunks and run the deserializer synchronously on CHUNK_END,
// where state.Buffer.Complete() has already been called and no wait is needed.
if (state.DeserTask == null && !IsBrowser)
{
_logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
var reader = state.Buffer;
var type = state.StreamedArgType;
var opts = _options;
state.DeserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(reader, type, opts));
}
input = input.Slice(totalNeeded);
state.ChunkFrameBytesConsumed += totalNeeded;
continue; // try next chunk immediately
}
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{
_logger?.LogDebug("TryParseChunkData [202] CHUNK_END — signaling completion");
// Signal end of data → background deser task completes
state.Buffer.Complete();
object? deserializedArg = null;
try
{
if (state.DeserTask != null)
{
// Desktop / server: background task has been deserializing concurrently
// with chunk arrival (pipeline parallelism). Wait for its result here.
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
}
else
{
// Browser (WASM) fallback: all chunks are buffered into a single contiguous byte[]
// inside SegmentBufferReader. Use ArrayBinaryInput via the offset-aware overload —
// strictly faster than SegmentBufferReaderInput here (JIT eliminates
// TryAdvanceSegment, no volatile reads, no cross-boundary branching).
deserializedArg = AcBinaryDeserializer.Deserialize(
state.Buffer.Buffer,
0,
state.Buffer.WritePos,
state.StreamedArgType,
_options);
}
_logger?.LogInformation("Deserialize end (chunked)");
if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
}
catch (Exception ex)
{
_logger?.LogError(ex, "TryParseChunkData deserialization FAILED targetType={TargetType}", state.StreamedArgType.Name);
throw;
}
finally
{
_logger?.LogDebug("TryParseChunkData [202] cleanup: Buffer.Dispose + _chunkStates.Remove");
state.Buffer.Dispose();
_chunkStates.Remove(binder);
}
// Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg);
input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage;
return true;
}
// Unknown byte in chunk mode — break out (shouldn't happen)
_logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking. " +
"binderHash={BinderHash} inputLength={InputLength} " +
"state: streamedArgType={TargetType} deserTaskStatus={TaskStatus} bufferWritePos={WritePos} bufferReadPos={ReadPos}",
firstByte,
binder.GetHashCode(),
input.Length,
state.StreamedArgType.Name,
state.DeserTask?.Status.ToString() ?? "null",
state.Buffer.WritePos,
state.Buffer.ReadPos);
break;
}
return false;
}
/// <summary>
/// Parses CHUNK_START: reads original message (with -1 marker for streamed arg),
/// creates SegmentBufferReader, stores state. Background deser task starts lazily on first chunk.
/// Returns null to signal "consumed bytes, no complete message yet".
/// </summary>
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
{
r.TryRead(out var originalMsgType);
_logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType);
// Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments.
// The header context returned by Parse* is captured locally and persisted on the per-binder
// AsyncChunkState below, so it survives the CHUNK_START → CHUNK_DATA × N → CHUNK_END boundary
// without any shared instance state (race-mentes on a shared protocol instance).
HubMessage? partialMessage;
object? headerContext;
switch (originalMsgType)
{
case MsgInvocation: partialMessage = ParseInvocation(ref r, binder, out headerContext); break;
case MsgStreamInvocation: partialMessage = ParseStreamInvocation(ref r, binder, out headerContext); break;
case MsgStreamItem: partialMessage = ParseStreamItem(ref r, binder, out headerContext); break;
case MsgCompletion: partialMessage = ParseCompletion(ref r, binder, out headerContext); break;
default: return null;
}
if (partialMessage == null) return null;
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
// Derived classes can override ResolveStreamedArgType to consult the header context
// (returned by ReadHeader) for per-message type resolution.
streamedType = ResolveStreamedArgType(streamedType, headerContext);
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
streamedIndex, streamedType.Name);
var state = new AsyncChunkState
{
PartialMessage = partialMessage,
Args = args,
StreamedArgIndex = streamedIndex,
StreamedArgType = streamedType,
HeaderContext = headerContext,
Buffer = new SegmentBufferReader(_options.BufferWriterChunkSize * 2, _logger)
// DeserTask started lazily in TryParseChunkData after first chunk is written
};
_chunkStates.AddOrUpdate(binder, state);
_logger?.LogDebug("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}",
binder.GetHashCode(), streamedType.Name);
return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData
}
/// <summary>
/// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array,
/// placeholder index, and the target deserialization type.
/// </summary>
private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder)
{
switch (message)
{
case InvocationMessage inv:
{
var paramTypes = binder.GetParameterTypes(inv.Target);
for (var i = 0; i < inv.Arguments.Length; i++)
{
if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (inv.Arguments, i, type);
}
break;
}
case StreamInvocationMessage sinv:
{
var paramTypes = binder.GetParameterTypes(sinv.Target);
for (var i = 0; i < sinv.Arguments.Length; i++)
{
if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (sinv.Arguments, i, type);
}
break;
}
case StreamItemMessage si:
{
if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
{
// StreamItemMessage.Item is read-only, use a wrapper array
var args = new object?[] { si.Item };
var type = binder.GetStreamItemType(si.InvocationId!);
return (args, 0, type);
}
break;
}
case CompletionMessage comp:
{
if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
{
var args = new object?[] { comp.Result };
var type = binder.GetReturnType(comp.InvocationId!);
return (args, 0, type);
}
break;
}
}
return ([], -1, typeof(object));
}
/// <summary>
/// Replaces the StreamedArgPlaceholder with the deserialized value in the stored message.
/// </summary>
private static void FillStreamedArg(AsyncChunkState state, object? deserializedValue)
{
if (state.StreamedArgIndex < 0) return;
switch (state.PartialMessage)
{
case InvocationMessage inv:
inv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamInvocationMessage sinv:
sinv.Arguments[state.StreamedArgIndex] = deserializedValue;
break;
case StreamItemMessage:
// StreamItemMessage.Item has no public setter — need to create a new message
if (state.PartialMessage is StreamItemMessage si) state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue);
break;
case CompletionMessage:
// CompletionMessage.Result has no public setter — need to create a new message
if (state.PartialMessage is CompletionMessage comp) state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue);
break;
}
}
#endregion
#region Argument Serialization
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
{
// byte[] fast-path: size known upfront, write entirely through BWO
if (value is byte[] byteArray)
{
var isAcBinary = byteArray.Length >= 2
&& byteArray[0] == AcBinarySerializerOptions.FormatVersion
&& (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase;
if (isAcBinary)
{
// Already AcBinary-serialized: write raw length + bytes, no tag wrapper
bw.WriteRaw(byteArray.Length);
}
else
{
// Raw byte[] (image, file, etc.): tag + raw bytes, no VarUInt (argLength implies size)
bw.WriteRaw(1 + byteArray.Length);
bw.WriteByte(BinaryTypeCode.ByteArray);
}
bw.WriteBytes(byteArray);
return;
}
// Bytes mode: serialize to byte[], write through BWO (no FlushAndReset needed)
if (_protocolMode == BinaryProtocolMode.Bytes)
{
var serialized = AcBinarySerializer.Serialize(value, _options);
bw.WriteRaw(serialized.Length);
bw.WriteBytes(serialized);
return;
}
// Segment mode: serialize directly to the pipe via BufferWriterBinaryOutput
// (AsyncSegment goes through WriteMessageChunked, never reaches here)
bw.FlushAndReset();
// Reserve arg length prefix directly on the pipe
var argLenSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var argBytes = AcBinarySerializer.Serialize(value, output, _options);
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
externalBytes += LengthPrefixSize + argBytes;
}
private object?[] ReadArguments(ref SequenceReader<byte> r, IReadOnlyList<Type> paramTypes, object? headerContext)
{
var count = (int)ReadVarUInt(ref r);
LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}");
var args = new object?[count];
for (var i = 0; i < count; i++)
{
var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object);
LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}");
args[i] = ReadSingleArgument(ref r, targetType, headerContext);
OnArgumentRead(args[i], i);
}
return args;
}
protected virtual void OnArgumentRead(object? value, int index) { }
/// <summary>
/// Override to resolve typeof(object) to a concrete type. Called after FindStreamedArgSlot in
/// chunked deserialization with the header context returned by <see cref="ReadHeader"/> for
/// the same message — derived classes can use it for per-message type resolution without
/// touching shared instance state.
/// </summary>
protected virtual Type ResolveStreamedArgType(Type binderType, object? headerContext) => binderType;
/// <summary>
/// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer.
/// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[].
/// The <paramref name="headerContext"/> is the opaque object returned by <see cref="ReadHeader"/>
/// for the same message — derived classes can use it to drive per-message decoding decisions
/// (e.g. raw-bytes vs typed deserialization, target-type override) without touching shared
/// instance state.
/// </summary>
protected virtual object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType, object? headerContext)
{
r.TryReadLittleEndian(out int argLength);
if (argLength == 0) return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1)
{
_logger?.LogTrace("ReadSingleArgument streamed arg marker (-1) → placeholder");
return StreamedArgPlaceholder;
}
// Null marker check
if (argLength == 1)
{
r.TryPeek(out var marker);
if (marker == 0) { r.Advance(1); return null; }
}
// Slice argument from pipe sequence — zero-copy reference
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
LogReadSingleArgument(argSlice, argLength, targetType);
// byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag →
// strip tag, rest is raw payload. No VarUInt length (argLength implies size).
var argReader = new SequenceReader<byte>(argSlice);
if (argReader.TryPeek(out var tag) && tag == BinaryTypeCode.ByteArray)
{
return SequenceToByteArray(argSlice.Slice(1));
}
// Unified non-chunked receive path: always ArrayBinaryInput via offset-aware overload.
// Single-segment: zero-copy on the pipe's slab. Multi-segment: pool-rented copy.
// _protocolMode no longer affects the receive side — it is only a send-side strategy.
var (arr, offset, length, rented) = GetArgBytes(argSlice);
try
{
return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, _options);
}
finally
{
if (rented) ArrayPool<byte>.Shared.Return(arr);
}
}
/// <summary>
/// Returns raw byte[] from the pipe sequence without any deserialization.
/// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
{
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length)
return seg.Array;
return data.ToArray();
}
/// <summary>
/// Exposes argSlice bytes as (array, offset, length) for offset-aware
/// <see cref="AcBinaryDeserializer.Deserialize(byte[], int, int, Type, AcBinarySerializerOptions)"/>.
/// <list type="bullet">
/// <item>Single-segment: zero-copy via <see cref="MemoryMarshal.TryGetArray{T}"/> — no allocation, no copy.</item>
/// <item>Multi-segment: <see cref="ArrayPool{T}"/>-rented contiguous copy; caller MUST return
/// the array via <see cref="ArrayPool{T}.Return"/> when <c>rented</c> is <c>true</c>.</item>
/// </list>
/// Enables ArrayBinaryInput (fastest — JIT-eliminates the TryAdvanceSegment branch) regardless
/// of whether the pipe delivered the payload as a single slab or multiple.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static (byte[] array, int offset, int length, bool rented) GetArgBytes(ReadOnlySequence<byte> argSlice)
{
if (argSlice.IsSingleSegment && MemoryMarshal.TryGetArray(argSlice.First, out var seg))
return (seg.Array!, seg.Offset, seg.Count, rented: false);
var length = (int)argSlice.Length;
var rentedBuf = ArrayPool<byte>.Shared.Rent(length);
argSlice.CopyTo(rentedBuf);
return (rentedBuf, 0, length, rented: true);
}
#endregion
#region Write Framing Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
{
if (value == null)
{
bw.WriteByte(0);
return;
}
bw.WriteByte(1);
bw.WriteStringUtf8(value);
}
private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array)
{
if (array == null || array.Length == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)array.Length);
for (var i = 0; i < array.Length; i++)
bw.WriteStringUtf8(array[i]);
}
private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary<string, string>? headers)
{
if (headers == null || headers.Count == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)headers.Count);
foreach (var kv in headers)
{
bw.WriteStringUtf8(kv.Key);
bw.WriteStringUtf8(kv.Value);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int VarUIntSize(uint value)
{
if (value < 0x80) return 1;
if (value < 0x4000) return 2;
if (value < 0x200000) return 3;
if (value < 0x10000000) return 4;
return 5;
}
#endregion
#region Sequence Read Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long ReadInt64(ref SequenceReader<byte> r)
{
r.TryReadLittleEndian(out long v);
return v;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static uint ReadVarUInt(ref SequenceReader<byte> r)
{
uint value = 0;
var shift = 0;
while (r.TryRead(out var b))
{
value |= (uint)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
return value;
shift += 7;
}
return value;
}
protected static string ReadString(ref SequenceReader<byte> r)
{
var byteCount = (int)ReadVarUInt(ref r);
if (byteCount == 0)
return string.Empty;
r.TryReadExact(byteCount, out var bytes);
return bytes.IsSingleSegment
? Encoding.UTF8.GetString(bytes.FirstSpan)
: Encoding.UTF8.GetString(bytes.ToArray());
}
protected static string? ReadNullableString(ref SequenceReader<byte> r)
{
r.TryRead(out var marker);
return marker == 0 ? null : ReadString(ref r);
}
private static string[]? ReadStringArray(ref SequenceReader<byte> r)
{
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var array = new string[count];
for (var i = 0; i < count; i++)
array[i] = ReadString(ref r);
return array;
}
private static Dictionary<string, string>? ReadHeaders(ref SequenceReader<byte> r)
{
if (r.Remaining == 0)
return null;
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var headers = new Dictionary<string, string>(count, StringComparer.Ordinal);
for (var i = 0; i < count; i++)
{
var key = ReadString(ref r);
var value = ReadString(ref r);
headers[key] = value;
}
return headers;
}
#endregion
#region Helpers
private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId)
{
if (invocationId != null)
return new InvocationMessage(invocationId, msg.Target, msg.Arguments);
return msg;
}
private static void SetHeaders(HubMessage msg, Dictionary<string, string> headers)
{
if (msg is HubInvocationMessage invMsg)
invMsg.Headers = headers;
}
#endregion
}