Optimize Pipe sync handling, logging, and chunked protocol

Refactor synchronous PipeReader/PipeWriter usage to avoid Task allocations via fast-path helpers. Add detailed debug/trace logging for chunked message flows and deserialization. Track active reads to prevent protocol errors. Refactor FindStreamedArgSlot and introduce ResolveStreamedArgType for dynamic streamed arg type resolution, with AyCodeBinaryHubProtocol override. Minor code cleanups and improved logging context throughout. Improves performance, correctness, and debuggability.
This commit is contained in:
Loretta 2026-04-11 18:07:31 +02:00
parent 82a407ff82
commit e73e1b6364
4 changed files with 197 additions and 103 deletions

View File

@ -63,6 +63,18 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
_lastFlush = default; _lastFlush = default;
} }
/// <summary>
/// Synchronously awaits a FlushAsync ValueTask.
/// Fast-path: if already completed, returns without Task allocation.
/// Slow-path: converts to Task for proper blocking (backpressure).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void SyncAwaitFlush(ValueTask<FlushResult> vt)
{
if (!vt.IsCompletedSuccessfully)
vt.AsTask().GetAwaiter().GetResult();
}
/// <summary> /// <summary>
/// Provides the initial buffer from the PipeWriter with 3-byte header reservation. /// Provides the initial buffer from the PipeWriter with 3-byte header reservation.
/// </summary> /// </summary>
@ -82,9 +94,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
[MethodImpl(MethodImplOptions.NoInlining)] [MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{ {
// Backpressure: wait for previous flush if still in progress // Backpressure: wait for previous flush if still in progress,
if (_waitForFlush && !_lastFlush.IsCompleted) // or if committed bytes approach the Pipe's PauseWriterThreshold (~64KB)
_lastFlush.GetAwaiter().GetResult(); // to prevent unbounded memory growth in waitForFlush=false mode.
if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize)
SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position); CommitCurrentChunk(buffer, position);
@ -115,8 +129,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public void Flush(byte[] buffer, int position) public void Flush(byte[] buffer, int position)
{ {
// Wait for any in-flight flush from previous Grow // Wait for any in-flight flush from previous Grow
if (!_lastFlush.IsCompleted) SyncAwaitFlush(_lastFlush);
_lastFlush.GetAwaiter().GetResult();
CommitCurrentChunk(buffer, position); CommitCurrentChunk(buffer, position);
} }

View File

@ -24,6 +24,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
private SequencePosition _nextSegmentPosition; private SequencePosition _nextSegmentPosition;
private SequencePosition _consumedUpTo; private SequencePosition _consumedUpTo;
private bool _pipeCompleted; private bool _pipeCompleted;
private bool _hasActiveRead;
// Cross-boundary scratch — same pattern as SequenceBinaryInput // Cross-boundary scratch — same pattern as SequenceBinaryInput
private byte[]? _scratchBuffer; private byte[]? _scratchBuffer;
@ -32,6 +33,15 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
private int _savedPosition; private int _savedPosition;
private int _savedBufferLength; private int _savedBufferLength;
/// <summary>
/// Synchronously gets the result of a PipeReader.ReadAsync ValueTask.
/// Fast-path: if already completed (data in pipe), returns directly without Task allocation.
/// Slow-path: converts to Task for proper blocking when waiting for writer.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ReadResult SyncReadResult(ValueTask<ReadResult> vt)
=> vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
public PipeReaderBinaryInput(PipeReader pipeReader) public PipeReaderBinaryInput(PipeReader pipeReader)
{ {
_pipeReader = pipeReader; _pipeReader = pipeReader;
@ -39,6 +49,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
_nextSegmentPosition = default; _nextSegmentPosition = default;
_consumedUpTo = default; _consumedUpTo = default;
_pipeCompleted = false; _pipeCompleted = false;
_hasActiveRead = false;
_scratchBuffer = null; _scratchBuffer = null;
_afterCrossBoundary = false; _afterCrossBoundary = false;
_savedBuffer = null; _savedBuffer = null;
@ -51,9 +62,10 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
/// </summary> /// </summary>
public void Initialize(out byte[] buffer, out int position, out int bufferLength) public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{ {
var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer; _currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted; _pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start; _consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start;
@ -101,10 +113,12 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
return false; return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
_hasActiveRead = false;
var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer; _currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted; _pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start; _consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start;
@ -128,7 +142,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
_scratchBuffer = null; _scratchBuffer = null;
} }
_pipeReader.AdvanceTo(_currentBuffer.End); if (_hasActiveRead)
{
_pipeReader.AdvanceTo(_currentBuffer.End);
_hasActiveRead = false;
}
} }
private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength) private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength)
@ -178,9 +196,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
return false; return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); _hasActiveRead = false;
var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer; _currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted; _pipeCompleted = result.IsCompleted;
_hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start; _consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start;

View File

@ -100,6 +100,15 @@ public class AcBinaryHubProtocol : IHubProtocol
public int Version => 1; public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary; public TransferFormat TransferFormat => TransferFormat.Binary;
/// <summary>
/// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask.
/// Fast-path: if already completed (no backpressure), returns directly without Task allocation.
/// Slow-path: converts to Task for proper blocking when pipe backpressure is active.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static FlushResult SyncFlush(ValueTask<FlushResult> vt)
=> vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version; public bool IsVersionSupported(int version) => version <= Version;
@ -130,7 +139,7 @@ public class AcBinaryHubProtocol : IHubProtocol
output.Advance(LengthPrefixSize); output.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize); var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
int externalBytes = 0; var externalBytes = 0;
switch (message) switch (message)
{ {
@ -179,6 +188,9 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalPayload = bw.Position + externalBytes; var totalPayload = bw.Position + externalBytes;
bw.Flush(); bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload);
} }
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes) private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
@ -296,13 +308,17 @@ public class AcBinaryHubProtocol : IHubProtocol
{ {
var (streamedArg, streamedArgIndex) = GetStreamedArg(message); var (streamedArg, streamedArgIndex) = GetStreamedArg(message);
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}",
message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null");
// --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) --- // --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) ---
{ {
var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize); var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize);
pipeWriter.Advance(LengthPrefixSize); pipeWriter.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize); var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize);
int externalBytes = 0; var externalBytes = 0;
bw.WriteByte(MsgAsyncChunkStart); bw.WriteByte(MsgAsyncChunkStart);
@ -347,18 +363,25 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalPayload = bw.Position + externalBytes; var totalPayload = bw.Position + externalBytes;
bw.Flush(); bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
_logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", totalPayload);
} }
pipeWriter.FlushAsync().GetAwaiter().GetResult(); SyncFlush(pipeWriter.FlushAsync());
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null) if (streamedArg != null)
AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); {
var dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
_logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
}
// --- CHUNK_END [202] --- // --- CHUNK_END [202] ---
var endByte = pipeWriter.GetSpan(1); var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd; endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1); pipeWriter.Advance(1);
pipeWriter.FlushAsync().GetAwaiter().GetResult(); SyncFlush(pipeWriter.FlushAsync());
_logger?.LogTrace("WriteMessageChunked CHUNK_END written");
} }
/// <summary> /// <summary>
@ -389,7 +412,10 @@ public class AcBinaryHubProtocol : IHubProtocol
// AsyncSegment chunk mode: non-standard framing (no INT32 length prefix) // AsyncSegment chunk mode: non-standard framing (no INT32 length prefix)
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState)) if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState))
{
_logger?.LogTrace("TryParseMessage chunk mode active, inputLength={InputLength}", input.Length);
return TryParseChunkData(ref input, chunkState, binder, out message); return TryParseChunkData(ref input, chunkState, binder, out message);
}
// Normal path // Normal path
var reader = new SequenceReader<byte>(input); var reader = new SequenceReader<byte>(input);
@ -399,18 +425,27 @@ public class AcBinaryHubProtocol : IHubProtocol
if (reader.Remaining < payloadLength) if (reader.Remaining < payloadLength)
return false; return false;
_logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length);
message = ParseMessage(ref reader, payloadLength, binder); message = ParseMessage(ref reader, payloadLength, binder);
input = input.Slice(LengthPrefixSize + payloadLength); input = input.Slice(LengthPrefixSize + payloadLength);
if (message != null) if (message != null)
{
if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
return true; return true;
}
// CHUNK_START consumed but no message yet — chunk mode just activated. // CHUNK_START consumed but no message yet — chunk mode just activated.
// Must try chunk data immediately; returning false here would cause SignalR // Must try chunk data immediately; returning false here would cause SignalR
// to call AdvanceTo(examined=end) and wait for new data, even though // to call AdvanceTo(examined=end) and wait for new data, even though
// CHUNK_DATA/CHUNK_END may already be in the remaining buffer. // CHUNK_DATA/CHUNK_END may already be in the remaining buffer.
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState)) if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState))
{
_logger?.LogDebug("TryParseMessage CHUNK_START activated, fallthrough to TryParseChunkData remainingInput={RemainingInput}", input.Length);
return TryParseChunkData(ref input, chunkState, binder, out message); return TryParseChunkData(ref input, chunkState, binder, out message);
}
return false; return false;
} }
@ -423,7 +458,7 @@ public class AcBinaryHubProtocol : IHubProtocol
// Mark end position so Parse* methods can check Remaining relative to payload // Mark end position so Parse* methods can check Remaining relative to payload
var payloadEnd = r.Consumed + payloadLength; var payloadEnd = r.Consumed + payloadLength;
r.TryRead(out byte msgType); r.TryRead(out var msgType);
return msgType switch return msgType switch
{ {
@ -454,9 +489,10 @@ public class AcBinaryHubProtocol : IHubProtocol
private void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType) private void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType)
{ {
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var segmentCount = 0; var segmentCount = 0;
foreach (var _ in argSlice) foreach (var _ in argSlice) segmentCount++;
segmentCount++;
_logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}", _logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}",
argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name); argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name);
} }
@ -465,8 +501,10 @@ public class AcBinaryHubProtocol : IHubProtocol
private void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining) private void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining)
{ {
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
var typeNames = new string[paramTypes.Count]; var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
_logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}", _logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}",
target, paramTypes.Count, string.Join(", ", typeNames), remaining); target, paramTypes.Count, string.Join(", ", typeNames), remaining);
} }
@ -483,13 +521,9 @@ public class AcBinaryHubProtocol : IHubProtocol
var streamIds = ReadStringArray(ref r); var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r); var headers = ReadHeaders(ref r);
var msg = streamIds is { Length: > 0 } var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId);
? new InvocationMessage(invocationId, target, args, streamIds)
: ApplyInvocationId(new InvocationMessage(target, args), invocationId);
if (headers != null)
SetHeaders(msg, headers);
if (headers != null) SetHeaders(msg, headers);
return msg; return msg;
} }
@ -503,8 +537,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r); var headers = ReadHeaders(ref r);
var msg = new StreamInvocationMessage(invocationId, target, args, streamIds); var msg = new StreamInvocationMessage(invocationId, target, args, streamIds);
if (headers != null) if (headers != null) SetHeaders(msg, headers);
SetHeaders(msg, headers);
return msg; return msg;
} }
@ -517,8 +550,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r); var headers = ReadHeaders(ref r);
var msg = new StreamItemMessage(invocationId, item); var msg = new StreamItemMessage(invocationId, item);
if (headers != null) if (headers != null) SetHeaders(msg, headers);
SetHeaders(msg, headers);
return msg; return msg;
} }
@ -527,7 +559,8 @@ public class AcBinaryHubProtocol : IHubProtocol
{ {
var invocationId = ReadString(ref r); var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r); var error = ReadNullableString(ref r);
r.TryRead(out byte hasResultByte);
r.TryRead(out var hasResultByte);
var hasResult = hasResultByte == 1; var hasResult = hasResultByte == 1;
object? result = null; object? result = null;
@ -540,15 +573,11 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r); var headers = ReadHeaders(ref r);
CompletionMessage msg; CompletionMessage msg;
if (error != null) if (error != null) msg = CompletionMessage.WithError(invocationId, error);
msg = CompletionMessage.WithError(invocationId, error); else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result);
else if (hasResult) else msg = CompletionMessage.Empty(invocationId);
msg = CompletionMessage.WithResult(invocationId, result);
else
msg = CompletionMessage.Empty(invocationId);
if (headers != null) if (headers != null) SetHeaders(msg, headers);
SetHeaders(msg, headers);
return msg; return msg;
} }
@ -559,8 +588,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r); var headers = ReadHeaders(ref r);
var msg = new CancelInvocationMessage(invocationId); var msg = new CancelInvocationMessage(invocationId);
if (headers != null) if (headers != null) SetHeaders(msg, headers);
SetHeaders(msg, headers);
return msg; return msg;
} }
@ -568,7 +596,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private static HubMessage ParseClose(ref SequenceReader<byte> r) private static HubMessage ParseClose(ref SequenceReader<byte> r)
{ {
var error = ReadNullableString(ref r); var error = ReadNullableString(ref r);
r.TryRead(out byte reconnectByte); r.TryRead(out var reconnectByte);
var allowReconnect = reconnectByte == 1; var allowReconnect = reconnectByte == 1;
return new CloseMessage(error, allowReconnect); return new CloseMessage(error, allowReconnect);
} }
@ -606,24 +635,28 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalNeeded = 3 + chunkDataSize; // header (3) + data var totalNeeded = 3 + chunkDataSize; // header (3) + data
if (input.Length < totalNeeded) return false; if (input.Length < totalNeeded) return false;
_logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
// Write chunk data to internal pipe for background deserialization // Write chunk data to internal pipe for background deserialization
if (chunkDataSize > 0) if (chunkDataSize > 0)
{ {
var dataSlice = input.Slice(3, chunkDataSize); var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice) foreach (var segment in dataSlice)
state.InternalPipe.Writer.Write(segment.Span); state.InternalPipe.Writer.Write(segment.Span);
state.InternalPipe.Writer.FlushAsync().GetAwaiter().GetResult(); SyncFlush(state.InternalPipe.Writer.FlushAsync());
} }
// Lazy start: begin background deserialization after first chunk is in the pipe. // Lazy start: begin background deserialization after first chunk is in the pipe.
// Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available. // Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available.
if (state.DeserTask == null) if (state.DeserTask == null)
{ {
_logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
var pipeReader = state.InternalPipe.Reader; var pipeReader = state.InternalPipe.Reader;
var type = state.StreamedArgType; var type = state.StreamedArgType;
var opts = _options; var opts = _options;
state.DeserTask = Task.Run(() =>
(object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); state.DeserTask = Task.Run(() => (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts));
} }
input = input.Slice(totalNeeded); input = input.Slice(totalNeeded);
@ -632,25 +665,32 @@ public class AcBinaryHubProtocol : IHubProtocol
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{ {
// Signal end of data → background deser task completes _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — completing pipe");
state.InternalPipe.Writer.Complete();
object? deserializedArg = null; // Signal end of data → background deser task completes
if (state.DeserTask != null) state.InternalPipe.Writer.Complete();
{ object? deserializedArg = null;
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
state.InternalPipe.Reader.Complete(); if (state.DeserTask != null)
{
deserializedArg = state.DeserTask.GetAwaiter().GetResult();
state.InternalPipe.Reader.Complete();
if (_logger!.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
}
// Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg);
_chunkStates!.Remove(binder);
input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage;
return true;
} }
// Fill the placeholder in the stored message's args
FillStreamedArg(state, deserializedArg);
_chunkStates!.Remove(binder);
input = input.Slice(1); // consume the single [202] byte
message = state.PartialMessage;
return true;
}
// Unknown byte in chunk mode — break out (shouldn't happen) // Unknown byte in chunk mode — break out (shouldn't happen)
_logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking", firstByte);
break; break;
} }
@ -664,7 +704,9 @@ public class AcBinaryHubProtocol : IHubProtocol
/// </summary> /// </summary>
private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder) private HubMessage? ParseAsyncChunkStart(ref SequenceReader<byte> r, IInvocationBinder binder)
{ {
r.TryRead(out byte originalMsgType); r.TryRead(out var originalMsgType);
_logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType);
// Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments // Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments
var partialMessage = originalMsgType switch var partialMessage = originalMsgType switch
@ -680,6 +722,10 @@ public class AcBinaryHubProtocol : IHubProtocol
// Find the placeholder arg and its target type // Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder); var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
streamedType = ResolveStreamedArgType(streamedType);
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
streamedIndex, streamedType.Name);
var state = new AsyncChunkState var state = new AsyncChunkState
{ {
@ -699,61 +745,59 @@ public class AcBinaryHubProtocol : IHubProtocol
/// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array, /// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array,
/// placeholder index, and the target deserialization type. /// placeholder index, and the target deserialization type.
/// </summary> /// </summary>
private static (object?[] args, int index, Type type) FindStreamedArgSlot( private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder)
HubMessage message, IInvocationBinder binder)
{ {
switch (message) switch (message)
{ {
case InvocationMessage inv: case InvocationMessage inv:
{
var paramTypes = binder.GetParameterTypes(inv.Target);
for (var i = 0; i < inv.Arguments.Length; i++)
{ {
if (ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) var paramTypes = binder.GetParameterTypes(inv.Target);
for (var i = 0; i < inv.Arguments.Length; i++)
{ {
if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (inv.Arguments, i, type); return (inv.Arguments, i, type);
} }
break;
} }
break;
}
case StreamInvocationMessage sinv: case StreamInvocationMessage sinv:
{
var paramTypes = binder.GetParameterTypes(sinv.Target);
for (var i = 0; i < sinv.Arguments.Length; i++)
{ {
if (ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) var paramTypes = binder.GetParameterTypes(sinv.Target);
for (var i = 0; i < sinv.Arguments.Length; i++)
{ {
if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue;
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (sinv.Arguments, i, type); return (sinv.Arguments, i, type);
} }
break;
} }
break;
}
case StreamItemMessage si: case StreamItemMessage si:
{
if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
{ {
// StreamItemMessage.Item is read-only, use a wrapper array if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
var args = new object?[] { si.Item }; {
var type = binder.GetStreamItemType(si.InvocationId!); // StreamItemMessage.Item is read-only, use a wrapper array
return (args, 0, type); var args = new object?[] { si.Item };
var type = binder.GetStreamItemType(si.InvocationId!);
return (args, 0, type);
}
break;
} }
break;
}
case CompletionMessage comp: case CompletionMessage comp:
{
if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
{ {
var args = new object?[] { comp.Result }; if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
var type = binder.GetReturnType(comp.InvocationId!); {
return (args, 0, type); var args = new object?[] { comp.Result };
var type = binder.GetReturnType(comp.InvocationId!);
return (args, 0, type);
}
break;
} }
break;
}
} }
return (Array.Empty<object?>(), -1, typeof(object)); return ([], -1, typeof(object));
} }
/// <summary> /// <summary>
@ -773,13 +817,11 @@ public class AcBinaryHubProtocol : IHubProtocol
break; break;
case StreamItemMessage: case StreamItemMessage:
// StreamItemMessage.Item has no public setter — need to create a new message // StreamItemMessage.Item has no public setter — need to create a new message
if (state.PartialMessage is StreamItemMessage si) if (state.PartialMessage is StreamItemMessage si) state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue);
state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue);
break; break;
case CompletionMessage: case CompletionMessage:
// CompletionMessage.Result has no public setter — need to create a new message // CompletionMessage.Result has no public setter — need to create a new message
if (state.PartialMessage is CompletionMessage comp) if (state.PartialMessage is CompletionMessage comp) state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue);
state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue);
break; break;
} }
} }
@ -791,8 +833,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes) private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
{ {
bw.WriteVarUInt((uint)arguments.Length); bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
WriteArgument(ref bw, output, arguments[i], ref externalBytes); for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes);
} }
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes) private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
@ -826,6 +868,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var serialized = AcBinarySerializer.Serialize(value, _options); var serialized = AcBinarySerializer.Serialize(value, _options);
bw.WriteRaw(serialized.Length); bw.WriteRaw(serialized.Length);
bw.WriteBytes(serialized); bw.WriteBytes(serialized);
return; return;
} }
@ -866,6 +909,12 @@ public class AcBinaryHubProtocol : IHubProtocol
protected virtual void OnArgumentRead(object? value, int index) { } protected virtual void OnArgumentRead(object? value, int index) { }
/// <summary>
/// Override to resolve typeof(object) to a concrete type (e.g., from SignalParams).
/// Called after FindStreamedArgSlot in chunked deserialization.
/// </summary>
protected virtual Type ResolveStreamedArgType(Type binderType) => binderType;
/// <summary> /// <summary>
/// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer. /// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer.
/// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[]. /// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[].
@ -874,17 +923,19 @@ public class AcBinaryHubProtocol : IHubProtocol
protected virtual object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType) protected virtual object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
{ {
r.TryReadLittleEndian(out int argLength); r.TryReadLittleEndian(out int argLength);
if (argLength == 0) if (argLength == 0) return null;
return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1) if (argLength == -1)
{
_logger?.LogTrace("ReadSingleArgument streamed arg marker (-1) → placeholder");
return StreamedArgPlaceholder; return StreamedArgPlaceholder;
}
// Null marker check // Null marker check
if (argLength == 1) if (argLength == 1)
{ {
r.TryPeek(out byte marker); r.TryPeek(out var marker);
if (marker == 0) { r.Advance(1); return null; } if (marker == 0) { r.Advance(1); return null; }
} }
@ -897,7 +948,7 @@ public class AcBinaryHubProtocol : IHubProtocol
// byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag → // byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag →
// strip tag, rest is raw payload. No VarUInt length (argLength implies size). // strip tag, rest is raw payload. No VarUInt length (argLength implies size).
var argReader = new SequenceReader<byte>(argSlice); var argReader = new SequenceReader<byte>(argSlice);
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray) if (argReader.TryPeek(out var tag) && tag == BinaryTypeCode.ByteArray)
{ {
return SequenceToByteArray(argSlice.Slice(1)); return SequenceToByteArray(argSlice.Slice(1));
} }
@ -919,8 +970,7 @@ public class AcBinaryHubProtocol : IHubProtocol
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static byte[] SequenceToByteArray(ReadOnlySequence<byte> data) protected static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
{ {
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length)
&& seg.Offset == 0 && seg.Count == seg.Array!.Length)
return seg.Array; return seg.Array;
return data.ToArray(); return data.ToArray();
@ -1003,7 +1053,7 @@ public class AcBinaryHubProtocol : IHubProtocol
{ {
uint value = 0; uint value = 0;
var shift = 0; var shift = 0;
while (r.TryRead(out byte b)) while (r.TryRead(out var b))
{ {
value |= (uint)(b & 0x7F) << shift; value |= (uint)(b & 0x7F) << shift;
if ((b & 0x80) == 0) if ((b & 0x80) == 0)
@ -1027,7 +1077,7 @@ public class AcBinaryHubProtocol : IHubProtocol
private static string? ReadNullableString(ref SequenceReader<byte> r) private static string? ReadNullableString(ref SequenceReader<byte> r)
{ {
r.TryRead(out byte marker); r.TryRead(out var marker);
return marker == 0 ? null : ReadString(ref r); return marker == 0 ? null : ReadString(ref r);
} }

View File

@ -74,4 +74,15 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
return DeserializeFromSequence(argSlice, targetType, Options); return DeserializeFromSequence(argSlice, targetType, Options);
} }
protected override Type ResolveStreamedArgType(Type binderType)
{
if (binderType == typeof(object) && _currentSignalParams?.SignalDataType != null)
{
var resolved = Type.GetType(_currentSignalParams.SignalDataType);
if (resolved != null)
return resolved;
}
return binderType;
}
} }