SignalR: Add raw byte[] fast-path for DataSource GetAll

Implements a high-performance raw byte[] protocol path for SignalR DataSource GetAll/LoadDataSource, using a new IsRawBytesData flag in SignalParams. When enabled, the server pre-serializes response data and sends it as a byte array, which the protocol passes through without further (de)serialization. The client receives the raw bytes and deserializes as needed, avoiding double serialization/deserialization and improving performance for large payloads.

Adds SerializerType selection to DataSource, propagates SignalParams through hub and protocol layers, and updates client/server/test code to support the new path. Also includes diagnostics flags for binary serialization debugging and fixes for multi-segment buffer handling.
This commit is contained in:
Loretta 2026-04-07 00:20:52 +02:00
parent 2d04b9f8f6
commit 05808d0d13
10 changed files with 146 additions and 50 deletions

View File

@ -247,8 +247,8 @@ public class BenchmarkSignalRHub : AcWebSignalRHubBase<BenchmarkSignalRTags, Tes
protected override string? GetUserIdentifier() => "benchmark-user";
protected override ClaimsPrincipal? GetUser() => null;
protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId)
=> SendMessageToClient(_callerClient, messageTag, status, responseData, requestId);
protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null)
=> SendMessageToClient(_callerClient, messageTag, status, responseData, requestId, clientSignalParams);
}
/// <summary>

View File

@ -25,6 +25,10 @@ public struct SequenceBinaryInput : IBinaryInputBase
private byte[]? _scratchBuffer;
private int _scratchLength;
// After a cross-boundary read, the next TryAdvanceSegment must load
// the remainder of _currentSegment (already adjusted) without incrementing.
private bool _afterCrossBoundary;
/// <summary>
/// Creates a SequenceBinaryInput from a multi-segment ReadOnlySequence.
/// Pre-extracts all segments as ArraySegment for fast iteration.
@ -55,6 +59,7 @@ public struct SequenceBinaryInput : IBinaryInputBase
_currentSegment = 0;
_scratchBuffer = null;
_scratchLength = 0;
_afterCrossBoundary = false;
}
/// <summary>
@ -79,6 +84,18 @@ public struct SequenceBinaryInput : IBinaryInputBase
[MethodImpl(MethodImplOptions.NoInlining)]
public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed)
{
// After cross-boundary scratch read: load the remainder of the current segment
// (already adjusted in TryReadCrossBoundary) without incrementing.
if (_afterCrossBoundary)
{
_afterCrossBoundary = false;
var seg = _segments[_currentSegment];
buffer = seg.Array!;
position = seg.Offset;
bufferLength = seg.Offset + seg.Count;
return seg.Count > 0;
}
// Calculate remaining bytes in current segment
var remaining = bufferLength - position;
@ -93,10 +110,10 @@ public struct SequenceBinaryInput : IBinaryInputBase
if (_currentSegment >= _segments.Length)
return false;
var seg = _segments[_currentSegment];
buffer = seg.Array!;
position = seg.Offset;
bufferLength = seg.Offset + seg.Count;
var seg2 = _segments[_currentSegment];
buffer = seg2.Array!;
position = seg2.Offset;
bufferLength = seg2.Offset + seg2.Count;
return true;
}
@ -129,15 +146,15 @@ public struct SequenceBinaryInput : IBinaryInputBase
position = 0;
bufferLength = _scratchLength;
// After the read completes, the position will be at 'needed' in scratch.
// The next EnsureAvailable will fail (scratch is consumed) and call TryAdvanceSegment again,
// which will set up the remainder of the current segment.
// We need to adjust the current segment's offset to skip the bytes we already copied.
// Adjust the current segment to skip the bytes we already copied.
// The _afterCrossBoundary flag ensures the next TryAdvanceSegment
// loads this remainder without incrementing _currentSegment.
_segments[_currentSegment] = new ArraySegment<byte>(
nextSeg.Array!,
nextSeg.Offset + fromNext,
nextSeg.Count - fromNext);
_afterCrossBoundary = true;
return true;
}
}

View File

@ -11,7 +11,11 @@ public class SignalRDataSourceTests_List_Json : SignalRDataSourceTestBase<TestOr
{
protected override AcSerializerOptions SerializerOption => new AcJsonSerializerOptions();
protected override TestOrderItemListDataSource CreateDataSource(TestableSignalRClient2 client, SignalRCrudTags crudTags)
=> new(client, crudTags);
{
var ds = new TestOrderItemListDataSource(client, crudTags);
ds.SerializerType = AcSerializerType.Json;
return ds;
}
[TestMethod]
public override async Task LoadDataSource_ReturnsAllItems() => await base.LoadDataSource_ReturnsAllItems();

View File

@ -11,7 +11,11 @@ public class SignalRDataSourceTests_Observable_Json : SignalRDataSourceTestBase<
{
protected override AcSerializerOptions SerializerOption => new AcJsonSerializerOptions();
protected override TestOrderItemObservableDataSource CreateDataSource(TestableSignalRClient2 client, SignalRCrudTags crudTags)
=> new(client, crudTags);
{
var ds = new TestOrderItemObservableDataSource(client, crudTags);
ds.SerializerType = AcSerializerType.Json;
return ds;
}
[TestMethod]
public override async Task LoadDataSource_ReturnsAllItems() => await base.LoadDataSource_ReturnsAllItems();

View File

@ -87,8 +87,8 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogg
#region Overridden Response Methods
protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId)
=> SendMessageToClient(_callerClient, messageTag, status, responseData, requestId);
protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null)
=> SendMessageToClient(_callerClient, messageTag, status, responseData, requestId, clientSignalParams);
#endregion
}

View File

@ -180,6 +180,12 @@ namespace AyCode.Services.Server.SignalRs
public AcSignalRClientBase SignalRClient;
protected readonly SignalRCrudTags SignalRCrudTags;
/// <summary>
/// Serializer type for RawBytes path. Matches server's SerializerOptions.SerializerType.
/// Default: Binary. Override for JSON/GZip datasources.
/// </summary>
public AcSerializerType SerializerType { get; set; } = AcSerializerType.Binary;
public Func<ItemChangedEventArgs<TDataItem>, Task>? OnDataSourceItemChanged;
public Func<Task>? OnDataSourceLoaded;
@ -260,17 +266,18 @@ namespace AyCode.Services.Server.SignalRs
/// </summary>
public async Task LoadDataSource(bool clearChangeTracking = true)
{
if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None)
if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None)
throw new ArgumentException($"SignalRCrudTags.GetAllMessageTag == SignalRTags.None");
BeginSync();
try
{
var response = await SignalRClient.GetAllAsync<SignalResponseDataMessage>(SignalRCrudTags.GetAllMessageTag, GetContextParams());
if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null)
throw new NullReferenceException($"LoadDataSource; Status: {response?.Status}");
// RawBytes: T=byte[] → server pre-serializes, protocol passes through.
// Single serialize (server) → single deserialize (PopulateMerge below).
var rawBytes = await SignalRClient.GetAllAsync<byte[]>(SignalRCrudTags.GetAllMessageTag, GetContextParams())
?? throw new NullReferenceException("LoadDataSource; null response");
await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType,
await LoadDataSourceFromResponseData(rawBytes, SerializerType,
false, false, clearChangeTracking);
}
finally
@ -280,8 +287,8 @@ namespace AyCode.Services.Server.SignalRs
}
/// <summary>
/// GetAllMessageTag - Async callback version with optimized direct populate.
/// Protocol deserializes directly to TIList — no intermediate byte[] or SignalData.
/// GetAllMessageTag - Async callback version.
/// RawBytes path: server pre-serializes → byte[] flows through protocol → PopulateMerge.
/// </summary>
public Task LoadDataSourceAsync(bool clearChangeTracking = true)
{
@ -289,16 +296,15 @@ namespace AyCode.Services.Server.SignalRs
throw new ArgumentException($"SignalRCrudTags.GetAllMessageTag == SignalRTags.None");
BeginSync();
return SignalRClient.GetAllAsync<SignalResponseDataMessage>(SignalRCrudTags.GetAllMessageTag, GetContextParams())
return SignalRClient.GetAllAsync<byte[]>(SignalRCrudTags.GetAllMessageTag, GetContextParams())
.ContinueWith(async responseTask =>
{
try
{
var response = await responseTask;
if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null)
throw new NullReferenceException($"LoadDataSourceAsync; Status: {response?.Status}");
var rawBytes = await responseTask
?? throw new NullReferenceException("LoadDataSourceAsync; null response");
await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType,
await LoadDataSourceFromResponseData(rawBytes, SerializerType,
false, false, clearChangeTracking);
}
finally

View File

@ -25,6 +25,11 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
protected AcSerializerOptions SerializerOptions = new AcBinarySerializerOptions();
/// <summary>
/// Enable diagnostic logging for binary serialization debugging.
/// Set to true to log hex dumps of serialized response data.
/// </summary>
public static bool EnableBinaryDiagnostics { get; set; }
#region Connection Lifecycle
@ -177,7 +182,7 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
if (Logger.LogLevel <= LogLevel.Debug)
Logger.Debug($"responseData ready ({SerializerOptions.SerializerType})");
await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId);
await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId, signalParams);
return;
}
@ -263,8 +268,8 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
protected virtual Task ResponseToCallerWithContent(int messageTag, object? content)
=> SendMessageToClient(Clients.Caller, messageTag, SignalResponseStatus.Success, content);
protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId)
=> SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId);
protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null)
=> SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId, clientSignalParams);
protected virtual Task SendMessageToUserIdWithContent(string userId, int messageTag, object? content)
=> SendMessageToClient(Clients.User(userId), messageTag, SignalResponseStatus.Success, content);
@ -280,15 +285,28 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
/// <summary>
/// Sends message to client. Protocol serializes responseData directly to pipe (zero-copy write).
/// clientSignalParams: the original client request's SignalParams (contains IsRawBytesData flag).
/// </summary>
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag,
SignalResponseStatus status, object? responseData, int? requestId = null)
SignalResponseStatus status, object? responseData, int? requestId = null, SignalParams? clientSignalParams = null)
{
var isRawBytes = clientSignalParams?.IsRawBytesData == true;
// IsRawBytesData: client wants raw byte[] — pre-serialize here, protocol passes through as-is.
// Single serialize (here) → single deserialize (consumer). No double ser/deser.
if (isRawBytes && responseData != null && responseData is not byte[])
{
responseData = SerializerOptions.SerializerType == AcSerializerType.Binary
? AcBinarySerializer.Serialize(responseData)
: AyCode.Core.Compression.GzipHelper.Compress(responseData.ToJson());
}
var signalParams = new SignalParams
{
Status = status,
DataSerializerType = SerializerOptions.SerializerType,
SignalDataType = responseData?.GetType().AssemblyQualifiedName
SignalDataType = isRawBytes ? null : responseData?.GetType().AssemblyQualifiedName,
IsRawBytesData = isRawBytes
};
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);

View File

@ -454,30 +454,57 @@ public class AcBinaryHubProtocol : IHubProtocol
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
// Response data: resolve actual type from SignalDataType for eager deserialization
if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null)
// byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag →
// strip tag + VarUInt length prefix, return raw payload. No deserializer.
var argReader = new SequenceReader<byte>(argSlice);
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
{
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
if (dataType != null)
targetType = dataType;
argReader.Advance(1); // skip tag
var payloadLength = (int)ReadVarUInt(ref argReader);
return SequenceToByteArray(argReader.UnreadSequence.Slice(0, payloadLength));
}
// IsRawBytesData: server serialized the object normally (no byte[] fast-path on write).
// argSlice IS the serialized data. Return it as raw byte[] — no deserialization.
// Consumer (e.g. DataSource.PopulateMerge) deserializes it.
if (_currentSignalParams is { IsRawBytesData: true })
return SequenceToByteArray(argSlice);
if (targetType == typeof(object) && _currentSignalParams != null)
{
// Typed response: resolve actual type from SignalDataType for eager deserialization
if (_currentSignalParams.SignalDataType != null)
{
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
if (dataType != null)
targetType = dataType;
}
}
return DeserializeFromSequence(argSlice, targetType, _options);
}
/// <summary>
/// Deserializes from a ReadOnlySequence, using the pipe's backing byte[] when possible (zero-copy).
/// Only copies for rare multi-segment arguments that span pipe buffer boundaries.
/// Returns raw byte[] from the pipe sequence without any deserialization.
/// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
{
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)
&& seg.Offset == 0 && seg.Count == seg.Array!.Length)
return seg.Array;
return data.ToArray();
}
/// <summary>
/// Deserializes from a ReadOnlySequence via AcBinaryDeserializer.
/// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options)
{
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg))
return AcBinaryDeserializer.Deserialize(seg.Array!, seg.Offset, (int)data.Length, targetType, options);
var bytes = data.ToArray();
return AcBinaryDeserializer.Deserialize(bytes, 0, bytes.Length, targetType, options);
}
=> AcBinaryDeserializer.Deserialize(data, targetType, options);
#endregion

View File

@ -21,6 +21,11 @@ namespace AyCode.Services.SignalRs
protected readonly HubConnection? HubConnection;
protected readonly AcLoggerBase Logger;
/// <summary>
/// Enable diagnostic logging for binary serialization debugging.
/// </summary>
public static bool EnableBinaryDiagnostics { get; set; }
protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, object data);
public int MsDelay = 25;
@ -136,7 +141,13 @@ namespace AyCode.Services.SignalRs
public virtual Task SendMessageToServerAsync(int messageTag)
=> SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId());
public virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId)
public virtual Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId)
=> SendCoreAsync(messageTag, parameters, requestId, new SignalParams { Status = SignalResponseStatus.Success });
/// <summary>
/// Core send: takes a pre-built SignalParams (caller controls IsRawBytesData etc.)
/// </summary>
protected async Task SendCoreAsync(int messageTag, object[]? parameters, int? requestId, SignalParams signalParams)
{
Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}");
@ -148,8 +159,6 @@ namespace AyCode.Services.SignalRs
return;
}
var signalParams = new SignalParams { Status = SignalResponseStatus.Success };
if (parameters is { Length: > 0 })
signalParams.SetParameterValues(parameters);
@ -362,7 +371,11 @@ namespace AyCode.Services.SignalRs
var requestModel = SignalRRequestModelPool.Get();
_responseByRequestId[requestId] = requestModel;
await SendMessageToServerAsync(messageTag, parameters, requestId);
await SendCoreAsync(messageTag, parameters, requestId, new SignalParams
{
Status = SignalResponseStatus.Success,
IsRawBytesData = typeof(TResponse) == typeof(byte[])
});
try
{

View File

@ -37,10 +37,17 @@ public class SignalParams : ISignalParams
/// <summary>
/// AssemblyQualifiedName of the response data type.
/// Set by server before sending. Protocol uses this to deserialize directly to the target type.
/// null = raw byte[] (populate/merge path).
/// </summary>
public string? SignalDataType { get; set; }
/// <summary>
/// Client sets true when requesting raw byte[] (e.g. DataSource populate/merge).
/// Server: reads this from client's SignalParams → serializes object → byte[] directly.
/// Protocol: reads this from response SignalParams → skips deserialization, returns raw byte[].
/// Result: single serialize (server) → single deserialize (consumer). No double ser/deser.
/// </summary>
public bool IsRawBytesData { get; set; }
/// <summary>
/// Cached deserialized byte[][] from Parameters.
/// </summary>