Refactor logging and unify argument deserialization

- Simplified logging with null-conditional operators
- Temporarily disabled WASM AsyncSegment guard for testing
- Unified argument deserialization via GetArgBytes for zero-copy and pooled buffer support
- Removed DeserializeFromSequence in favor of new approach
- Applied improvements to AyCodeBinaryHubProtocol
- Updated comments and performed minor code cleanup
This commit is contained in:
Loretta 2026-04-20 14:20:34 +02:00
parent dc16f493d5
commit 939ce9c39b
2 changed files with 69 additions and 57 deletions

View File

@ -115,10 +115,14 @@ public class AcBinaryHubProtocol : IHubProtocol
// Send-side guard: AsyncSegment uses AsyncPipeWriterOutput whose sync-over-async flush // Send-side guard: AsyncSegment uses AsyncPipeWriterOutput whose sync-over-async flush
// would block the browser's single UI thread. The receive side converts chunked wire // would block the browser's single UI thread. The receive side converts chunked wire
// to a synchronous deserialize on WASM automatically (see TryParseChunkData). // to a synchronous deserialize on WASM automatically (see TryParseChunkData).
if (IsBrowser && protocolMode == BinaryProtocolMode.AsyncSegment) //
throw new PlatformNotSupportedException( // TEMP: commented out to test AsyncSegment on both Windows app and WASM without rebuild.
"BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " + // Small WASM payloads work; larger ones may deadlock on sync-over-async FlushAsync.
"Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead."); // Restore once BinaryProtocolMode is runtime-configurable in Program.cs.
//if (IsBrowser && protocolMode == BinaryProtocolMode.AsyncSegment)
// throw new PlatformNotSupportedException(
// "BinaryProtocolMode.AsyncSegment is not supported on WebAssembly. " +
// "Use BinaryProtocolMode.Bytes or BinaryProtocolMode.Segment instead.");
_options = options; _options = options;
_options.BufferWriterChunkSize = 4096; _options.BufferWriterChunkSize = 4096;
@ -208,10 +212,7 @@ public class AcBinaryHubProtocol : IHubProtocol
public void WriteMessage(HubMessage message, IBufferWriter<byte> output) public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{ {
if (_logger != null) _logger?.LogInformation("Serialize start");
{
_logger.LogInformation("Serialize start");
}
// AsyncSegment: chunked protocol framing for messages with streamable arguments // AsyncSegment: chunked protocol framing for messages with streamable arguments
if (_protocolMode == BinaryProtocolMode.AsyncSegment if (_protocolMode == BinaryProtocolMode.AsyncSegment
@ -277,13 +278,10 @@ public class AcBinaryHubProtocol : IHubProtocol
bw.Flush(); bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
if (_logger != null) _logger?.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload);
{
_logger.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload);
if (_logger.IsEnabled(LogLevel.Debug)) if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload); _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload);
}
} }
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes) private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
@ -493,11 +491,8 @@ public class AcBinaryHubProtocol : IHubProtocol
var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0; var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0;
var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1; var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1;
if (_logger != null) _logger?.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}",
{ dataBytes, chunkCount, totalSentSize);
_logger.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}",
dataBytes, chunkCount, totalSentSize);
}
} }
/// <summary> /// <summary>
@ -569,11 +564,7 @@ public class AcBinaryHubProtocol : IHubProtocol
return false; return false;
_logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length); _logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length);
_logger?.LogInformation("Deserialize start");
if (_logger != null)
{
_logger.LogInformation("Deserialize start");
}
message = ParseMessage(ref reader, payloadLength, binder); message = ParseMessage(ref reader, payloadLength, binder);
@ -581,13 +572,9 @@ public class AcBinaryHubProtocol : IHubProtocol
{ {
input = input.Slice(LengthPrefixSize + payloadLength); input = input.Slice(LengthPrefixSize + payloadLength);
if (_logger != null) _logger?.LogInformation("Deserialize end");
{ if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
_logger.LogInformation("Deserialize end");
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name);
}
return true; return true;
} }
@ -884,20 +871,22 @@ public class AcBinaryHubProtocol : IHubProtocol
} }
else else
{ {
// Browser (WASM) fallback: all chunks are buffered, state.Buffer.Complete() // Browser (WASM) fallback: all chunks are buffered into a single contiguous byte[]
// has been called above, so the synchronous deserializer reads through the // inside SegmentBufferReader. Use ArrayBinaryInput via the offset-aware overload —
// completed buffer without any Monitor.Wait. // strictly faster than SegmentBufferReaderInput here (JIT eliminates
// TryAdvanceSegment, no volatile reads, no cross-boundary branching).
deserializedArg = AcBinaryDeserializer.Deserialize( deserializedArg = AcBinaryDeserializer.Deserialize(
state.Buffer, state.StreamedArgType, _options); state.Buffer.Buffer,
0,
state.Buffer.WritePos,
state.StreamedArgType,
_options);
} }
if (_logger != null) _logger?.LogInformation("Deserialize end (chunked)");
{
_logger.LogInformation("Deserialize end (chunked)");
if (_logger.IsEnabled(LogLevel.Debug)) if (_logger?.IsEnabled(LogLevel.Debug) == true)
_logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null");
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -1198,14 +1187,18 @@ public class AcBinaryHubProtocol : IHubProtocol
return SequenceToByteArray(argSlice.Slice(1)); return SequenceToByteArray(argSlice.Slice(1));
} }
// Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead) // Unified non-chunked receive path: always ArrayBinaryInput via offset-aware overload.
if (_protocolMode == BinaryProtocolMode.Bytes) // Single-segment: zero-copy on the pipe's slab. Multi-segment: pool-rented copy.
// _protocolMode no longer affects the receive side — it is only a send-side strategy.
var (arr, offset, length, rented) = GetArgBytes(argSlice);
try
{ {
var bytes = SequenceToByteArray(argSlice); return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, _options);
return AcBinaryDeserializer.Deserialize(bytes, targetType, _options); }
finally
{
if (rented) ArrayPool<byte>.Shared.Return(arr);
} }
return DeserializeFromSequence(argSlice, targetType, _options);
} }
/// <summary> /// <summary>
@ -1222,12 +1215,27 @@ public class AcBinaryHubProtocol : IHubProtocol
} }
/// <summary> /// <summary>
/// Deserializes from a ReadOnlySequence via AcBinaryDeserializer. /// Exposes argSlice bytes as (array, offset, length) for offset-aware
/// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy). /// <see cref="AcBinaryDeserializer.Deserialize(byte[], int, int, Type, AcBinarySerializerOptions)"/>.
/// <list type="bullet">
/// <item>Single-segment: zero-copy via <see cref="MemoryMarshal.TryGetArray{T}"/> — no allocation, no copy.</item>
/// <item>Multi-segment: <see cref="ArrayPool{T}"/>-rented contiguous copy; caller MUST return
/// the array via <see cref="ArrayPool{T}.Return"/> when <c>rented</c> is <c>true</c>.</item>
/// </list>
/// Enables ArrayBinaryInput (fastest — JIT-eliminates the TryAdvanceSegment branch) regardless
/// of whether the pipe delivered the payload as a single slab or multiple.
/// </summary> /// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options) protected static (byte[] array, int offset, int length, bool rented) GetArgBytes(ReadOnlySequence<byte> argSlice)
=> AcBinaryDeserializer.Deserialize(data, targetType, options); {
if (argSlice.IsSingleSegment && MemoryMarshal.TryGetArray(argSlice.First, out var seg))
return (seg.Array!, seg.Offset, seg.Count, rented: false);
var length = (int)argSlice.Length;
var rentedBuf = ArrayPool<byte>.Shared.Rent(length);
argSlice.CopyTo(rentedBuf);
return (rentedBuf, 0, length, rented: true);
}
#endregion #endregion

View File

@ -196,13 +196,17 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
if (targetType == typeof(object) && hctx?.Type != null) if (targetType == typeof(object) && hctx?.Type != null)
targetType = hctx.Type; targetType = hctx.Type;
// 4. Deserialize — Bytes mode linearizes, Segment/AsyncSegment uses the sequence directly // 4. Deserialize — unified ArrayBinaryInput path via GetArgBytes.
if (_protocolMode == BinaryProtocolMode.Bytes) // Single-segment: zero-copy on the pipe's slab. Multi-segment: ArrayPool-rented copy.
// _protocolMode no longer affects receive — it is only a send-side strategy.
var (arr, offset, length, rented) = GetArgBytes(argSlice);
try
{ {
var bytes = SequenceToByteArray(argSlice); return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, Options);
return AcBinaryDeserializer.Deserialize(bytes, targetType, Options); }
finally
{
if (rented) ArrayPool<byte>.Shared.Return(arr);
} }
return DeserializeFromSequence(argSlice, targetType, Options);
} }
} }