diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
index 8b953a9..7dc044c 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
@@ -63,6 +63,18 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
_lastFlush = default;
}
+ ///
+ /// Synchronously awaits a FlushAsync ValueTask.
+ /// Fast-path: if already completed, returns without Task allocation.
+ /// Slow-path: converts to Task for proper blocking (backpressure).
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static void SyncAwaitFlush(ValueTask vt)
+ {
+ if (!vt.IsCompletedSuccessfully)
+ vt.AsTask().GetAwaiter().GetResult();
+ }
+
///
/// Provides the initial buffer from the PipeWriter with 3-byte header reservation.
///
@@ -82,9 +94,11 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
[MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{
- // Backpressure: wait for previous flush if still in progress
- if (_waitForFlush && !_lastFlush.IsCompleted)
- _lastFlush.GetAwaiter().GetResult();
+ // Backpressure: wait for previous flush if still in progress,
+ // or if committed bytes approach the Pipe's PauseWriterThreshold (~64KB)
+ // to prevent unbounded memory growth in waitForFlush=false mode.
+ if ((_waitForFlush && !_lastFlush.IsCompleted) || _committedBytes > MaxChunkSize - _chunkSize)
+ SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position);
@@ -115,8 +129,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
public void Flush(byte[] buffer, int position)
{
// Wait for any in-flight flush from previous Grow
- if (!_lastFlush.IsCompleted)
- _lastFlush.GetAwaiter().GetResult();
+ SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position);
}
diff --git a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs
index 86f158c..516e35d 100644
--- a/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs
+++ b/AyCode.Core/Serializers/Binaries/PipeReaderBinaryInput.cs
@@ -24,6 +24,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
private SequencePosition _nextSegmentPosition;
private SequencePosition _consumedUpTo;
private bool _pipeCompleted;
+ private bool _hasActiveRead;
// Cross-boundary scratch — same pattern as SequenceBinaryInput
private byte[]? _scratchBuffer;
@@ -32,6 +33,15 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
private int _savedPosition;
private int _savedBufferLength;
+ ///
+ /// Synchronously gets the result of a PipeReader.ReadAsync ValueTask.
+ /// Fast-path: if already completed (data in pipe), returns directly without Task allocation.
+ /// Slow-path: converts to Task for proper blocking when waiting for writer.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ReadResult SyncReadResult(ValueTask vt)
+ => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
+
public PipeReaderBinaryInput(PipeReader pipeReader)
{
_pipeReader = pipeReader;
@@ -39,6 +49,7 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
_nextSegmentPosition = default;
_consumedUpTo = default;
_pipeCompleted = false;
+ _hasActiveRead = false;
_scratchBuffer = null;
_afterCrossBoundary = false;
_savedBuffer = null;
@@ -51,9 +62,10 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
///
public void Initialize(out byte[] buffer, out int position, out int bufferLength)
{
- var result = _pipeReader.ReadAsync().GetAwaiter().GetResult();
+ var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
+ _hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
@@ -101,10 +113,12 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
+ _hasActiveRead = false;
- var result = _pipeReader.ReadAsync().GetAwaiter().GetResult();
+ var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
+ _hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
@@ -128,7 +142,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
_scratchBuffer = null;
}
- _pipeReader.AdvanceTo(_currentBuffer.End);
+ if (_hasActiveRead)
+ {
+ _pipeReader.AdvanceTo(_currentBuffer.End);
+ _hasActiveRead = false;
+ }
}
private bool TryLoadNextSegmentFromBuffer(ref byte[] buffer, ref int position, ref int bufferLength)
@@ -178,9 +196,11 @@ public struct PipeReaderBinaryInput : IBinaryInputBase
return false;
_pipeReader.AdvanceTo(_consumedUpTo, _currentBuffer.End);
- var result = _pipeReader.ReadAsync().GetAwaiter().GetResult();
+ _hasActiveRead = false;
+ var result = SyncReadResult(_pipeReader.ReadAsync());
_currentBuffer = result.Buffer;
_pipeCompleted = result.IsCompleted;
+ _hasActiveRead = true;
_consumedUpTo = _currentBuffer.Start;
_nextSegmentPosition = _currentBuffer.Start;
diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
index 944924b..55db0ab 100644
--- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
+++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
@@ -100,6 +100,15 @@ public class AcBinaryHubProtocol : IHubProtocol
public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary;
+ ///
+ /// Synchronously gets the result of a PipeWriter.FlushAsync ValueTask.
+ /// Fast-path: if already completed (no backpressure), returns directly without Task allocation.
+ /// Slow-path: converts to Task for proper blocking when pipe backpressure is active.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static FlushResult SyncFlush(ValueTask vt)
+ => vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
@@ -130,7 +139,7 @@ public class AcBinaryHubProtocol : IHubProtocol
output.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
- int externalBytes = 0;
+ var externalBytes = 0;
switch (message)
{
@@ -179,6 +188,9 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
+
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload);
}
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes)
@@ -296,13 +308,17 @@ public class AcBinaryHubProtocol : IHubProtocol
{
var (streamedArg, streamedArgIndex) = GetStreamedArg(message);
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}",
+ message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null");
+
// --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) ---
{
var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize);
pipeWriter.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(pipeWriter, _options.BufferWriterChunkSize);
- int externalBytes = 0;
+ var externalBytes = 0;
bw.WriteByte(MsgAsyncChunkStart);
@@ -347,18 +363,25 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
+
+ _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", totalPayload);
}
- pipeWriter.FlushAsync().GetAwaiter().GetResult();
+ SyncFlush(pipeWriter.FlushAsync());
// --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) ---
if (streamedArg != null)
- AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
+ {
+ var dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options);
+ _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes);
+ }
// --- CHUNK_END [202] ---
var endByte = pipeWriter.GetSpan(1);
endByte[0] = MsgAsyncChunkEnd;
pipeWriter.Advance(1);
- pipeWriter.FlushAsync().GetAwaiter().GetResult();
+ SyncFlush(pipeWriter.FlushAsync());
+
+ _logger?.LogTrace("WriteMessageChunked CHUNK_END written");
}
///
@@ -389,7 +412,10 @@ public class AcBinaryHubProtocol : IHubProtocol
// AsyncSegment chunk mode: non-standard framing (no INT32 length prefix)
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out var chunkState))
+ {
+ _logger?.LogTrace("TryParseMessage chunk mode active, inputLength={InputLength}", input.Length);
return TryParseChunkData(ref input, chunkState, binder, out message);
+ }
// Normal path
var reader = new SequenceReader(input);
@@ -399,18 +425,27 @@ public class AcBinaryHubProtocol : IHubProtocol
if (reader.Remaining < payloadLength)
return false;
+ _logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length);
+
message = ParseMessage(ref reader, payloadLength, binder);
input = input.Slice(LengthPrefixSize + payloadLength);
if (message != null)
+ {
+ if (_logger != null && _logger.IsEnabled(LogLevel.Debug))
+ _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
return true;
+ }
// CHUNK_START consumed but no message yet — chunk mode just activated.
// Must try chunk data immediately; returning false here would cause SignalR
// to call AdvanceTo(examined=end) and wait for new data, even though
// CHUNK_DATA/CHUNK_END may already be in the remaining buffer.
if (_chunkStates != null && _chunkStates.TryGetValue(binder, out chunkState))
+ {
+ _logger?.LogDebug("TryParseMessage CHUNK_START activated, fallthrough to TryParseChunkData remainingInput={RemainingInput}", input.Length);
return TryParseChunkData(ref input, chunkState, binder, out message);
+ }
return false;
}
@@ -423,7 +458,7 @@ public class AcBinaryHubProtocol : IHubProtocol
// Mark end position so Parse* methods can check Remaining relative to payload
var payloadEnd = r.Consumed + payloadLength;
- r.TryRead(out byte msgType);
+ r.TryRead(out var msgType);
return msgType switch
{
@@ -454,9 +489,10 @@ public class AcBinaryHubProtocol : IHubProtocol
private void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
+
var segmentCount = 0;
- foreach (var _ in argSlice)
- segmentCount++;
+ foreach (var _ in argSlice) segmentCount++;
+
_logger.LogDebug("[AcBinaryHubProtocol] ReadSingleArgument: argLength={ArgLength}, isSingleSegment={IsSingleSegment}, segments={SegmentCount}, type={TypeName}",
argLength, argSlice.IsSingleSegment, segmentCount, targetType.Name);
}
@@ -465,8 +501,10 @@ public class AcBinaryHubProtocol : IHubProtocol
private void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Debug)) return;
+
var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
+
_logger.LogDebug("[AcBinaryHubProtocol] ParseInvocation target='{Target}'; paramTypes.Count={ParamCount}; types=[{Types}]; remaining={Remaining}",
target, paramTypes.Count, string.Join(", ", typeNames), remaining);
}
@@ -483,13 +521,9 @@ public class AcBinaryHubProtocol : IHubProtocol
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
- var msg = streamIds is { Length: > 0 }
- ? new InvocationMessage(invocationId, target, args, streamIds)
- : ApplyInvocationId(new InvocationMessage(target, args), invocationId);
-
- if (headers != null)
- SetHeaders(msg, headers);
+ var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId);
+ if (headers != null) SetHeaders(msg, headers);
return msg;
}
@@ -503,8 +537,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r);
var msg = new StreamInvocationMessage(invocationId, target, args, streamIds);
- if (headers != null)
- SetHeaders(msg, headers);
+ if (headers != null) SetHeaders(msg, headers);
return msg;
}
@@ -517,8 +550,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r);
var msg = new StreamItemMessage(invocationId, item);
- if (headers != null)
- SetHeaders(msg, headers);
+ if (headers != null) SetHeaders(msg, headers);
return msg;
}
@@ -527,7 +559,8 @@ public class AcBinaryHubProtocol : IHubProtocol
{
var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r);
- r.TryRead(out byte hasResultByte);
+
+ r.TryRead(out var hasResultByte);
var hasResult = hasResultByte == 1;
object? result = null;
@@ -540,15 +573,11 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r);
CompletionMessage msg;
- if (error != null)
- msg = CompletionMessage.WithError(invocationId, error);
- else if (hasResult)
- msg = CompletionMessage.WithResult(invocationId, result);
- else
- msg = CompletionMessage.Empty(invocationId);
+ if (error != null) msg = CompletionMessage.WithError(invocationId, error);
+ else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result);
+ else msg = CompletionMessage.Empty(invocationId);
- if (headers != null)
- SetHeaders(msg, headers);
+ if (headers != null) SetHeaders(msg, headers);
return msg;
}
@@ -559,8 +588,7 @@ public class AcBinaryHubProtocol : IHubProtocol
var headers = ReadHeaders(ref r);
var msg = new CancelInvocationMessage(invocationId);
- if (headers != null)
- SetHeaders(msg, headers);
+ if (headers != null) SetHeaders(msg, headers);
return msg;
}
@@ -568,7 +596,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private static HubMessage ParseClose(ref SequenceReader r)
{
var error = ReadNullableString(ref r);
- r.TryRead(out byte reconnectByte);
+ r.TryRead(out var reconnectByte);
+
var allowReconnect = reconnectByte == 1;
return new CloseMessage(error, allowReconnect);
}
@@ -606,24 +635,28 @@ public class AcBinaryHubProtocol : IHubProtocol
var totalNeeded = 3 + chunkDataSize; // header (3) + data
if (input.Length < totalNeeded) return false;
+ _logger?.LogTrace("TryParseChunkData [201] chunkDataSize={ChunkDataSize} inputLength={InputLength}", chunkDataSize, input.Length);
+
// Write chunk data to internal pipe for background deserialization
if (chunkDataSize > 0)
{
var dataSlice = input.Slice(3, chunkDataSize);
foreach (var segment in dataSlice)
state.InternalPipe.Writer.Write(segment.Span);
- state.InternalPipe.Writer.FlushAsync().GetAwaiter().GetResult();
+ SyncFlush(state.InternalPipe.Writer.FlushAsync());
}
// Lazy start: begin background deserialization after first chunk is in the pipe.
// Must not start earlier — PipeReaderBinaryInput.ReadAsync needs data available.
if (state.DeserTask == null)
{
+ _logger?.LogDebug("TryParseChunkData starting background deserialization targetType={TargetType}", state.StreamedArgType.Name);
+
var pipeReader = state.InternalPipe.Reader;
var type = state.StreamedArgType;
var opts = _options;
- state.DeserTask = Task.Run(() =>
- (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts));
+
+ state.DeserTask = Task.Run(() => (object?)AcBinaryDeserializer.Deserialize(pipeReader, type, opts));
}
input = input.Slice(totalNeeded);
@@ -632,25 +665,32 @@ public class AcBinaryHubProtocol : IHubProtocol
if (firstByte == MsgAsyncChunkEnd) // 202 — end signal (no data)
{
- // Signal end of data → background deser task completes
- state.InternalPipe.Writer.Complete();
- object? deserializedArg = null;
- if (state.DeserTask != null)
- {
- deserializedArg = state.DeserTask.GetAwaiter().GetResult();
- state.InternalPipe.Reader.Complete();
+ _logger?.LogDebug("TryParseChunkData [202] CHUNK_END — completing pipe");
+
+ // Signal end of data → background deser task completes
+ state.InternalPipe.Writer.Complete();
+ object? deserializedArg = null;
+
+ if (state.DeserTask != null)
+ {
+ deserializedArg = state.DeserTask.GetAwaiter().GetResult();
+ state.InternalPipe.Reader.Complete();
+
+ if (_logger!.IsEnabled(LogLevel.Debug)) _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
+ }
+
+ // Fill the placeholder in the stored message's args
+ FillStreamedArg(state, deserializedArg);
+ _chunkStates!.Remove(binder);
+
+ input = input.Slice(1); // consume the single [202] byte
+ message = state.PartialMessage;
+
+ return true;
}
- // Fill the placeholder in the stored message's args
- FillStreamedArg(state, deserializedArg);
-
- _chunkStates!.Remove(binder);
- input = input.Slice(1); // consume the single [202] byte
- message = state.PartialMessage;
- return true;
- }
-
// Unknown byte in chunk mode — break out (shouldn't happen)
+ _logger?.LogWarning("TryParseChunkData unknown byte {FirstByte} in chunk mode, breaking", firstByte);
break;
}
@@ -664,7 +704,9 @@ public class AcBinaryHubProtocol : IHubProtocol
///
private HubMessage? ParseAsyncChunkStart(ref SequenceReader r, IInvocationBinder binder)
{
- r.TryRead(out byte originalMsgType);
+ r.TryRead(out var originalMsgType);
+
+ _logger?.LogDebug("ParseAsyncChunkStart innerMsgType={InnerMsgType}", originalMsgType);
// Parse the original message normally — -1 marker becomes StreamedArgPlaceholder in ReadArguments
var partialMessage = originalMsgType switch
@@ -680,6 +722,10 @@ public class AcBinaryHubProtocol : IHubProtocol
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
+ streamedType = ResolveStreamedArgType(streamedType);
+
+ _logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
+ streamedIndex, streamedType.Name);
var state = new AsyncChunkState
{
@@ -699,61 +745,59 @@ public class AcBinaryHubProtocol : IHubProtocol
/// Finds the StreamedArgPlaceholder in the parsed message's arguments and returns the args array,
/// placeholder index, and the target deserialization type.
///
- private static (object?[] args, int index, Type type) FindStreamedArgSlot(
- HubMessage message, IInvocationBinder binder)
+ private static (object?[] args, int index, Type type) FindStreamedArgSlot(HubMessage message, IInvocationBinder binder)
{
switch (message)
{
case InvocationMessage inv:
- {
- var paramTypes = binder.GetParameterTypes(inv.Target);
- for (var i = 0; i < inv.Arguments.Length; i++)
{
- if (ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder))
+ var paramTypes = binder.GetParameterTypes(inv.Target);
+ for (var i = 0; i < inv.Arguments.Length; i++)
{
+ if (!ReferenceEquals(inv.Arguments[i], StreamedArgPlaceholder)) continue;
+
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (inv.Arguments, i, type);
}
+ break;
}
- break;
- }
case StreamInvocationMessage sinv:
- {
- var paramTypes = binder.GetParameterTypes(sinv.Target);
- for (var i = 0; i < sinv.Arguments.Length; i++)
{
- if (ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder))
+ var paramTypes = binder.GetParameterTypes(sinv.Target);
+ for (var i = 0; i < sinv.Arguments.Length; i++)
{
+ if (!ReferenceEquals(sinv.Arguments[i], StreamedArgPlaceholder)) continue;
+
var type = i < paramTypes.Count ? paramTypes[i] : typeof(object);
return (sinv.Arguments, i, type);
}
+ break;
}
- break;
- }
case StreamItemMessage si:
- {
- if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
{
- // StreamItemMessage.Item is read-only, use a wrapper array
- var args = new object?[] { si.Item };
- var type = binder.GetStreamItemType(si.InvocationId!);
- return (args, 0, type);
+ if (ReferenceEquals(si.Item, StreamedArgPlaceholder))
+ {
+ // StreamItemMessage.Item is read-only, use a wrapper array
+ var args = new object?[] { si.Item };
+ var type = binder.GetStreamItemType(si.InvocationId!);
+ return (args, 0, type);
+ }
+ break;
}
- break;
- }
case CompletionMessage comp:
- {
- if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
{
- var args = new object?[] { comp.Result };
- var type = binder.GetReturnType(comp.InvocationId!);
- return (args, 0, type);
+ if (comp.HasResult && ReferenceEquals(comp.Result, StreamedArgPlaceholder))
+ {
+ var args = new object?[] { comp.Result };
+ var type = binder.GetReturnType(comp.InvocationId!);
+
+ return (args, 0, type);
+ }
+ break;
}
- break;
- }
}
- return (Array.Empty