From e73e1b63648a06eaf83cf9e9a25b5b04bba64e7c Mon Sep 17 00:00:00 2001 From: Loretta Date: Sat, 11 Apr 2026 18:07:31 +0200 Subject: [PATCH] Optimize Pipe sync handling, logging, and chunked protocol Refactor synchronous PipeReader/PipeWriter usage to avoid Task allocations via fast-path helpers. Add detailed debug/trace logging for chunked message flows and deserialization. Track active reads to prevent protocol errors. Refactor FindStreamedArgSlot and introduce ResolveStreamedArgType for dynamic streamed arg type resolution, with AyCodeBinaryHubProtocol override. Minor code cleanups and improved logging context throughout. Improves performance, correctness, and debuggability. --- .../Binaries/AsyncPipeWriterOutput.cs | 23 +- .../Binaries/PipeReaderBinaryInput.cs | 28 ++- .../SignalRs/AcBinaryHubProtocol.cs | 238 +++++++++++------- .../SignalRs/AyCodeBinaryHubProtocol.cs | 11 + 4 files changed, 197 insertions(+), 103 deletions(-) diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 8b953a9..7dc044c 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -63,6 +63,18 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _lastFlush = default; } + /// + /// Synchronously awaits a FlushAsync ValueTask. + /// Fast-path: if already completed, returns without Task allocation. + /// Slow-path: converts to Task for proper blocking (backpressure). + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void SyncAwaitFlush(ValueTask vt) + { + if (!vt.IsCompletedSuccessfully) + vt.AsTask().GetAwaiter().GetResult(); + } + /// /// Provides the initial buffer from the PipeWriter with 3-byte header reservation. /// @@ -82,9 +94,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase [MethodImpl(MethodImplOptions.NoInlining)] public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed) { - // Backpressure: wait for previous flush if still in progress - if (_waitForFlush && !_lastFlush.IsCompleted) - _lastFlush.GetAwaiter().GetResult(); + // Backpressure: wait for previous flush if still in progress, + // or if committed bytes approach the Pipe's PauseWriterThreshold (~64KB) + // to prevent unbounded memory growth in waitForFlush=false mode. + if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize) + SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); @@ -115,8 +129,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase public void Flush(byte[] buffer, int position) { // Wait for any in-flight flush from previous Grow - if (!_lastFlush.IsCompleted) - _lastFlush.GetAwaiter().GetResult(); + SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); } diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs index 86f158c..516e35d 100644 --- a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs +++ b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs @@ -24,6 +24,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase private SequencePosition _nextSegmentPosition; private SequencePosition _consumedUpTo; private bool _pipeCompleted; + private bool _hasActiveRead; // Cross-boundary scratch — same pattern as SequenceBinaryInput private byte[]? _scratchBuffer; @@ -32,6 +33,15 @@ public struct PipeReaderBinaryInput : IBinaryInputBase private int _savedPosition; private int _savedBufferLength; + /// + /// Synchronously gets the result of a PipeReader.ReadAsync ValueTask. + /// Fast-path: if already completed (data in pipe), returns directly without Task allocation. + /// Slow-path: converts to Task for proper blocking when waiting for writer. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ReadResult SyncReadResult(ValueTask vt) + => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult(); + public PipeReaderBinaryInput(PipeReader pipeReader) { _pipeReader = pipeReader; @@ -39,6 +49,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase _nextSegmentPosition = default; _consumedUpTo = default; _pipeCompleted = false; + _hasActiveRead = false; _scratchBuffer = null; _afterCrossBoundary = false; _savedBuffer = null; @@ -51,9 +62,10 @@ public struct PipeReaderBinaryInput : IBinaryInputBase /// public void Initialize(out byte[] buffer, out int position, out int bufferLength) { - var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + var result = SyncReadResult(_pipeReader.ReadAsync()); _currentBuffer = result.Buffer; _pipeCompleted = result.IsCompleted; + _hasActiveRead = true; _consumedUpTo = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start; @@ -101,10 +113,12 @@ public struct PipeReaderBinaryInput : IBinaryInputBase return false; _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); + _hasActiveRead = false; - var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + var result = SyncReadResult(_pipeReader.ReadAsync()); _currentBuffer = result.Buffer; _pipeCompleted = result.IsCompleted; + _hasActiveRead = true; _consumedUpTo = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start; @@ -128,7 +142,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase _scratchBuffer = null; } - _pipeReader.AdvanceTo(_currentBuffer.End); + if (_hasActiveRead) + { + _pipeReader.AdvanceTo(_currentBuffer.End); + _hasActiveRead = false; + } } private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength) @@ -178,9 +196,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase return false; _pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End); - var result = _pipeReader.ReadAsync().GetAwaiter().GetResult(); + _hasActiveRead = false; + var result = SyncReadResult(_pipeReader.ReadAsync()); _currentBuffer = result.Buffer; _pipeCompleted = result.IsCompleted; + _hasActiveRead = true; _consumedUpTo = _currentBuffer.Start; _nextSegmentPosition = _currentBuffer.Start; diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 944924b..55db0ab 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -100,6 +100,15 @@ public class AcBinaryHubProtocol : IHubProtocol public int Version => 1; public TransferFormat TransferFormat => TransferFormat.Binary; + /// + /// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask. + /// Fast-path: if already completed (no backpressure), returns directly without Task allocation. + /// Slow-path: converts to Task for proper blocking when pipe backpressure is active. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static FlushResult SyncFlush(ValueTask vt) + => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult(); + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsVersionSupported(int version) => version <= Version; @@ -130,7 +139,7 @@ public class AcBinaryHubProtocol : IHubProtocol output.Advance(LengthPrefixSize); var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize); - int externalBytes = 0; + var externalBytes = 0; switch (message) { @@ -179,6 +188,9 @@ public class AcBinaryHubProtocol : IHubProtocol var totalPayload = bw.Position + externalBytes; bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); + + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload); } private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) @@ -296,13 +308,17 @@ public class AcBinaryHubProtocol : IHubProtocol { var (streamedArg, streamedArgIndex) = GetStreamedArg(message); + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}", + message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null"); + // --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) --- { var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize); pipeWriter.Advance(LengthPrefixSize); var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize); - int externalBytes = 0; + var externalBytes = 0; bw.WriteByte(MsgAsyncChunkStart); @@ -347,18 +363,25 @@ public class AcBinaryHubProtocol : IHubProtocol var totalPayload = bw.Position + externalBytes; bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); + + _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", totalPayload); } - pipeWriter.FlushAsync().GetAwaiter().GetResult(); + SyncFlush(pipeWriter.FlushAsync()); // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- if (streamedArg != null) - AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); + { + var dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); + _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); + } // --- CHUNK_END [202] --- var endByte = pipeWriter.GetSpan(1); endByte[0] = MsgAsyncChunkEnd; pipeWriter.Advance(1); - pipeWriter.FlushAsync().GetAwaiter().GetResult(); + SyncFlush(pipeWriter.FlushAsync()); + + _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); } /// @@ -389,7 +412,10 @@ public class AcBinaryHubProtocol : IHubProtocol // AsyncSegment chunk mode: non-standard framing (no INT32 length prefix) if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState)) + { + _logger?.LogTrace("TryParseMessage chunk mode active, inputLength={InputLength}", input.Length); return TryParseChunkData(ref input, chunkState, binder, out message); + } // Normal path var reader = new SequenceReader(input); @@ -399,18 +425,27 @@ public class AcBinaryHubProtocol : IHubProtocol if (reader.Remaining < payloadLength) return false; + _logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length); + message = ParseMessage(ref reader, payloadLength, binder); input = input.Slice(LengthPrefixSize + payloadLength); if (message != null) + { + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); return true; + } // CHUNK_START consumed but no message yet — chunk mode just activated. // Must try chunk data immediately; returning false here would cause SignalR // to call AdvanceTo(examined=end) and wait for new data, even though // CHUNK_DATA/CHUNK_END may already be in the remaining buffer. if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState)) + { + _logger?.LogDebug("TryParseMessage CHUNK_START activated, fallthrough to TryParseChunkData remainingInput={RemainingInput}", input.Length); return TryParseChunkData(ref input, chunkState, binder, out message); + } return false; } @@ -423,7 +458,7 @@ public class AcBinaryHubProtocol : IHubProtocol // Mark end position so Parse* methods can check Remaining relative to payload var payloadEnd = r.Consumed + payloadLength; - r.TryRead(out byte msgType); + r.TryRead(out var msgType); return msgType switch { @@ -454,9 +489,10 @@ public class AcBinaryHubProtocol : IHubProtocol private void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType) { if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; + var segmentCount = 0; - foreach (var _ in argSlice) - segmentCount++; + foreach (var _ in argSlice) segmentCount++; + _logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}", argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name); } @@ -465,8 +501,10 @@ public class AcBinaryHubProtocol : IHubProtocol private void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) { if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return; + var typeNames = new string[paramTypes.Count]; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; + _logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}", target, paramTypes.Count, string.Join(", ", typeNames), remaining); } @@ -483,13 +521,9 @@ public class AcBinaryHubProtocol : IHubProtocol var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); - var msg = streamIds is { Length: > 0 } - ? new InvocationMessage(invocationId, target, args, streamIds) - : ApplyInvocationId(new InvocationMessage(target, args), invocationId); - - if (headers != null) - SetHeaders(msg, headers); + var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId); + if (headers != null) SetHeaders(msg, headers); return msg; } @@ -503,8 +537,7 @@ public class AcBinaryHubProtocol : IHubProtocol var headers = ReadHeaders(ref r); var msg = new StreamInvocationMessage(invocationId, target, args, streamIds); - if (headers != null) - SetHeaders(msg, headers); + if (headers != null) SetHeaders(msg, headers); return msg; } @@ -517,8 +550,7 @@ public class AcBinaryHubProtocol : IHubProtocol var headers = ReadHeaders(ref r); var msg = new StreamItemMessage(invocationId, item); - if (headers != null) - SetHeaders(msg, headers); + if (headers != null) SetHeaders(msg, headers); return msg; } @@ -527,7 +559,8 @@ public class AcBinaryHubProtocol : IHubProtocol { var invocationId = ReadString(ref r); var error = ReadNullableString(ref r); - r.TryRead(out byte hasResultByte); + + r.TryRead(out var hasResultByte); var hasResult = hasResultByte == 1; object? result = null; @@ -540,15 +573,11 @@ public class AcBinaryHubProtocol : IHubProtocol var headers = ReadHeaders(ref r); CompletionMessage msg; - if (error != null) - msg = CompletionMessage.WithError(invocationId, error); - else if (hasResult) - msg = CompletionMessage.WithResult(invocationId, result); - else - msg = CompletionMessage.Empty(invocationId); + if (error != null) msg = CompletionMessage.WithError(invocationId, error); + else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result); + else msg = CompletionMessage.Empty(invocationId); - if (headers != null) - SetHeaders(msg, headers); + if (headers != null) SetHeaders(msg, headers); return msg; } @@ -559,8 +588,7 @@ public class AcBinaryHubProtocol : IHubProtocol var headers = ReadHeaders(ref r); var msg = new CancelInvocationMessage(invocationId); - if (headers != null) - SetHeaders(msg, headers); + if (headers != null) SetHeaders(msg, headers); return msg; } @@ -568,7 +596,8 @@ public class AcBinaryHubProtocol : IHubProtocol private static HubMessage ParseClose(ref SequenceReader r) { var error = ReadNullableString(ref r); - r.TryRead(out byte reconnectByte); + r.TryRead(out var reconnectByte); + var allowReconnect = reconnectByte == 1; return new CloseMessage(error, allowReconnect); } @@ -606,24 +635,28 @@ public class AcBinaryHubProtocol : IHubProtocol var totalNeeded = 3 + chunkDataSize; // header (3) + data if (input.Length < totalNeeded) return false; + _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length); + // Write chunk data to internal pipe for background deserialization if (chunkDataSize > 0) { var dataSlice = input.Slice(3, chunkDataSize); foreach (var segment in dataSlice) state.InternalPipe.Writer.Write(segment.Span); - state.InternalPipe.Writer.FlushAsync().GetAwaiter().GetResult(); + SyncFlush(state.InternalPipe.Writer.FlushAsync()); } // Lazy start: begin background deserialization after first chunk is in the pipe. // Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available. if (state.DeserTask == null) { + _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name); + var pipeReader = state.InternalPipe.Reader; var type = state.StreamedArgType; var opts = _options; - state.DeserTask = Task.Run(() => - (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); + + state.DeserTask = Task.Run(() => (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts)); } input = input.Slice(totalNeeded); @@ -632,25 +665,32 @@ public class AcBinaryHubProtocol : IHubProtocol if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data) { - // Signal end of data → background deser task completes - state.InternalPipe.Writer.Complete(); - object? deserializedArg = null; - if (state.DeserTask != null) - { - deserializedArg = state.DeserTask.GetAwaiter().GetResult(); - state.InternalPipe.Reader.Complete(); + _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — completing pipe"); + + // Signal end of data → background deser task completes + state.InternalPipe.Writer.Complete(); + object? deserializedArg = null; + + if (state.DeserTask != null) + { + deserializedArg = state.DeserTask.GetAwaiter().GetResult(); + state.InternalPipe.Reader.Complete(); + + if (_logger!.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); + } + + // Fill the placeholder in the stored message's args + FillStreamedArg(state, deserializedArg); + _chunkStates!.Remove(binder); + + input = input.Slice(1); // consume the single [202] byte + message = state.PartialMessage; + + return true; } - // Fill the placeholder in the stored message's args - FillStreamedArg(state, deserializedArg); - - _chunkStates!.Remove(binder); - input = input.Slice(1); // consume the single [202] byte - message = state.PartialMessage; - return true; - } - // Unknown byte in chunk mode — break out (shouldn't happen) + _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking", firstByte); break; } @@ -664,7 +704,9 @@ public class AcBinaryHubProtocol : IHubProtocol /// private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder) { - r.TryRead(out byte originalMsgType); + r.TryRead(out var originalMsgType); + + _logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType); // Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments var partialMessage = originalMsgType switch @@ -680,6 +722,10 @@ public class AcBinaryHubProtocol : IHubProtocol // Find the placeholder arg and its target type var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder); + streamedType = ResolveStreamedArgType(streamedType); + + _logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}", + streamedIndex, streamedType.Name); var state = new AsyncChunkState { @@ -699,61 +745,59 @@ public class AcBinaryHubProtocol : IHubProtocol /// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array, /// placeholder index, and the target deserialization type. /// - private static (object?[] args, int index, Type type) FindStreamedArgSlot( - HubMessage message, IInvocationBinder binder) + private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder) { switch (message) { case InvocationMessage inv: - { - var paramTypes = binder.GetParameterTypes(inv.Target); - for (var i = 0; i < inv.Arguments.Length; i++) { - if (ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) + var paramTypes = binder.GetParameterTypes(inv.Target); + for (var i = 0; i < inv.Arguments.Length; i++) { + if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue; + var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); return (inv.Arguments, i, type); } + break; } - break; - } case StreamInvocationMessage sinv: - { - var paramTypes = binder.GetParameterTypes(sinv.Target); - for (var i = 0; i < sinv.Arguments.Length; i++) { - if (ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) + var paramTypes = binder.GetParameterTypes(sinv.Target); + for (var i = 0; i < sinv.Arguments.Length; i++) { + if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue; + var type = i < paramTypes.Count ? paramTypes[i] : typeof(object); return (sinv.Arguments, i, type); } + break; } - break; - } case StreamItemMessage si: - { - if (ReferenceEquals(si.Item, StreamedArgPlaceholder)) { - // StreamItemMessage.Item is read-only, use a wrapper array - var args = new object?[] { si.Item }; - var type = binder.GetStreamItemType(si.InvocationId!); - return (args, 0, type); + if (ReferenceEquals(si.Item, StreamedArgPlaceholder)) + { + // StreamItemMessage.Item is read-only, use a wrapper array + var args = new object?[] { si.Item }; + var type = binder.GetStreamItemType(si.InvocationId!); + return (args, 0, type); + } + break; } - break; - } case CompletionMessage comp: - { - if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder)) { - var args = new object?[] { comp.Result }; - var type = binder.GetReturnType(comp.InvocationId!); - return (args, 0, type); + if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder)) + { + var args = new object?[] { comp.Result }; + var type = binder.GetReturnType(comp.InvocationId!); + + return (args, 0, type); + } + break; } - break; - } } - return (Array.Empty(), -1, typeof(object)); + return ([], -1, typeof(object)); } /// @@ -773,13 +817,11 @@ public class AcBinaryHubProtocol : IHubProtocol break; case StreamItemMessage: // StreamItemMessage.Item has no public setter — need to create a new message - if (state.PartialMessage is StreamItemMessage si) - state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue); + if (state.PartialMessage is StreamItemMessage si) state.PartialMessage = new StreamItemMessage(si.InvocationId!, deserializedValue); break; case CompletionMessage: // CompletionMessage.Result has no public setter — need to create a new message - if (state.PartialMessage is CompletionMessage comp) - state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue); + if (state.PartialMessage is CompletionMessage comp) state.PartialMessage = CompletionMessage.WithResult(comp.InvocationId!, deserializedValue); break; } } @@ -791,8 +833,8 @@ public class AcBinaryHubProtocol : IHubProtocol private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, ref int externalBytes) { bw.WriteVarUInt((uint)arguments.Length); - for (var i = 0; i < arguments.Length; i++) - WriteArgument(ref bw, output, arguments[i], ref externalBytes); + + for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes); } private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter output, object? value, ref int externalBytes) @@ -826,6 +868,7 @@ public class AcBinaryHubProtocol : IHubProtocol var serialized = AcBinarySerializer.Serialize(value, _options); bw.WriteRaw(serialized.Length); bw.WriteBytes(serialized); + return; } @@ -866,6 +909,12 @@ public class AcBinaryHubProtocol : IHubProtocol protected virtual void OnArgumentRead(object? value, int index) { } + /// + /// Override to resolve typeof(object) to a concrete type (e.g., from SignalParams). + /// Called after FindStreamedArgSlot in chunked deserialization. + /// + protected virtual Type ResolveStreamedArgType(Type binderType) => binderType; + /// /// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer. /// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[]. @@ -874,17 +923,19 @@ public class AcBinaryHubProtocol : IHubProtocol protected virtual object? ReadSingleArgument(ref SequenceReader r, Type targetType) { r.TryReadLittleEndian(out int argLength); - if (argLength == 0) - return null; + if (argLength == 0) return null; // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization if (argLength == -1) + { + _logger?.LogTrace("ReadSingleArgument streamed arg marker (-1) → placeholder"); return StreamedArgPlaceholder; + } // Null marker check if (argLength == 1) { - r.TryPeek(out byte marker); + r.TryPeek(out var marker); if (marker == 0) { r.Advance(1); return null; } } @@ -897,7 +948,7 @@ public class AcBinaryHubProtocol : IHubProtocol // byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag → // strip tag, rest is raw payload. No VarUInt length (argLength implies size). var argReader = new SequenceReader(argSlice); - if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray) + if (argReader.TryPeek(out var tag) && tag == BinaryTypeCode.ByteArray) { return SequenceToByteArray(argSlice.Slice(1)); } @@ -919,8 +970,7 @@ public class AcBinaryHubProtocol : IHubProtocol [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static byte[] SequenceToByteArray(ReadOnlySequence data) { - if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) - && seg.Offset == 0 && seg.Count == seg.Array!.Length) + if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length) return seg.Array; return data.ToArray(); @@ -1003,7 +1053,7 @@ public class AcBinaryHubProtocol : IHubProtocol { uint value = 0; var shift = 0; - while (r.TryRead(out byte b)) + while (r.TryRead(out var b)) { value |= (uint)(b & 0x7F) << shift; if ((b & 0x80) == 0) @@ -1027,7 +1077,7 @@ public class AcBinaryHubProtocol : IHubProtocol private static string? ReadNullableString(ref SequenceReader r) { - r.TryRead(out byte marker); + r.TryRead(out var marker); return marker == 0 ? null : ReadString(ref r); } diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index 0f06b42..0ca3c47 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -74,4 +74,15 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol return DeserializeFromSequence(argSlice, targetType, Options); } + + protected override Type ResolveStreamedArgType(Type binderType) + { + if (binderType == typeof(object) && _currentSignalParams?.SignalDataType != null) + { + var resolved = Type.GetType(_currentSignalParams.SignalDataType); + if (resolved != null) + return resolved; + } + return binderType; + } }