diff --git a/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs b/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs index 66224ab..4c170b7 100644 --- a/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs +++ b/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs @@ -247,8 +247,8 @@ public class BenchmarkSignalRHub : AcWebSignalRHubBase "benchmark-user"; protected override ClaimsPrincipal? GetUser() => null; - protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) - => SendMessageToClient(_callerClient, messageTag, status, responseData, requestId); + protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null) + => SendMessageToClient(_callerClient, messageTag, status, responseData, requestId, clientSignalParams); } /// diff --git a/AyCode.Core/Serializers/Binaries/SequenceBinaryInput.cs b/AyCode.Core/Serializers/Binaries/SequenceBinaryInput.cs index d7d2990..6fbc06f 100644 --- a/AyCode.Core/Serializers/Binaries/SequenceBinaryInput.cs +++ b/AyCode.Core/Serializers/Binaries/SequenceBinaryInput.cs @@ -25,6 +25,10 @@ public struct SequenceBinaryInput : IBinaryInputBase private byte[]? _scratchBuffer; private int _scratchLength; + // After a cross-boundary read, the next TryAdvanceSegment must load + // the remainder of _currentSegment (already adjusted) without incrementing. + private bool _afterCrossBoundary; + /// /// Creates a SequenceBinaryInput from a multi-segment ReadOnlySequence. /// Pre-extracts all segments as ArraySegment for fast iteration. @@ -55,6 +59,7 @@ public struct SequenceBinaryInput : IBinaryInputBase _currentSegment = 0; _scratchBuffer = null; _scratchLength = 0; + _afterCrossBoundary = false; } /// @@ -79,6 +84,18 @@ public struct SequenceBinaryInput : IBinaryInputBase [MethodImpl(MethodImplOptions.NoInlining)] public bool TryAdvanceSegment(ref byte[] buffer, ref int position, ref int bufferLength, int needed) { + // After cross-boundary scratch read: load the remainder of the current segment + // (already adjusted in TryReadCrossBoundary) without incrementing. + if (_afterCrossBoundary) + { + _afterCrossBoundary = false; + var seg = _segments[_currentSegment]; + buffer = seg.Array!; + position = seg.Offset; + bufferLength = seg.Offset + seg.Count; + return seg.Count > 0; + } + // Calculate remaining bytes in current segment var remaining = bufferLength - position; @@ -93,10 +110,10 @@ public struct SequenceBinaryInput : IBinaryInputBase if (_currentSegment >= _segments.Length) return false; - var seg = _segments[_currentSegment]; - buffer = seg.Array!; - position = seg.Offset; - bufferLength = seg.Offset + seg.Count; + var seg2 = _segments[_currentSegment]; + buffer = seg2.Array!; + position = seg2.Offset; + bufferLength = seg2.Offset + seg2.Count; return true; } @@ -129,15 +146,15 @@ public struct SequenceBinaryInput : IBinaryInputBase position = 0; bufferLength = _scratchLength; - // After the read completes, the position will be at 'needed' in scratch. - // The next EnsureAvailable will fail (scratch is consumed) and call TryAdvanceSegment again, - // which will set up the remainder of the current segment. - // We need to adjust the current segment's offset to skip the bytes we already copied. + // Adjust the current segment to skip the bytes we already copied. + // The _afterCrossBoundary flag ensures the next TryAdvanceSegment + // loads this remainder without incrementing _currentSegment. _segments[_currentSegment] = new ArraySegment( nextSeg.Array!, nextSeg.Offset + fromNext, nextSeg.Count - fromNext); + _afterCrossBoundary = true; return true; } } diff --git a/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_List_Json.cs b/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_List_Json.cs index b353735..e127882 100644 --- a/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_List_Json.cs +++ b/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_List_Json.cs @@ -11,7 +11,11 @@ public class SignalRDataSourceTests_List_Json : SignalRDataSourceTestBase new AcJsonSerializerOptions(); protected override TestOrderItemListDataSource CreateDataSource(TestableSignalRClient2 client, SignalRCrudTags crudTags) - => new(client, crudTags); + { + var ds = new TestOrderItemListDataSource(client, crudTags); + ds.SerializerType = AcSerializerType.Json; + return ds; + } [TestMethod] public override async Task LoadDataSource_ReturnsAllItems() => await base.LoadDataSource_ReturnsAllItems(); diff --git a/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_Observable_Json.cs b/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_Observable_Json.cs index b804455..a5f866f 100644 --- a/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_Observable_Json.cs +++ b/AyCode.Services.Server.Tests/SignalRs/SignalRDatasources/SignalRDataSourceTests_Observable_Json.cs @@ -11,7 +11,11 @@ public class SignalRDataSourceTests_Observable_Json : SignalRDataSourceTestBase< { protected override AcSerializerOptions SerializerOption => new AcJsonSerializerOptions(); protected override TestOrderItemObservableDataSource CreateDataSource(TestableSignalRClient2 client, SignalRCrudTags crudTags) - => new(client, crudTags); + { + var ds = new TestOrderItemObservableDataSource(client, crudTags); + ds.SerializerType = AcSerializerType.Json; + return ds; + } [TestMethod] public override async Task LoadDataSource_ReturnsAllItems() => await base.LoadDataSource_ReturnsAllItems(); diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs index 3aa4b3b..9310ad3 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs @@ -87,8 +87,8 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase SendMessageToClient(_callerClient, messageTag, status, responseData, requestId); + protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null) + => SendMessageToClient(_callerClient, messageTag, status, responseData, requestId, clientSignalParams); #endregion } diff --git a/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs b/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs index 86601eb..c7720ba 100644 --- a/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs +++ b/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs @@ -180,6 +180,12 @@ namespace AyCode.Services.Server.SignalRs public AcSignalRClientBase SignalRClient; protected readonly SignalRCrudTags SignalRCrudTags; + /// + /// Serializer type for RawBytes path. Matches server's SerializerOptions.SerializerType. + /// Default: Binary. Override for JSON/GZip datasources. + /// + public AcSerializerType SerializerType { get; set; } = AcSerializerType.Binary; + public Func, Task>? OnDataSourceItemChanged; public Func? OnDataSourceLoaded; @@ -260,17 +266,18 @@ namespace AyCode.Services.Server.SignalRs /// public async Task LoadDataSource(bool clearChangeTracking = true) { - if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None) + if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None) throw new ArgumentException($"SignalRCrudTags.GetAllMessageTag == SignalRTags.None"); BeginSync(); try { - var response = await SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()); - if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null) - throw new NullReferenceException($"LoadDataSource; Status: {response?.Status}"); + // RawBytes: T=byte[] → server pre-serializes, protocol passes through. + // Single serialize (server) → single deserialize (PopulateMerge below). + var rawBytes = await SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()) + ?? throw new NullReferenceException("LoadDataSource; null response"); - await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType, + await LoadDataSourceFromResponseData(rawBytes, SerializerType, false, false, clearChangeTracking); } finally @@ -280,8 +287,8 @@ namespace AyCode.Services.Server.SignalRs } /// - /// GetAllMessageTag - Async callback version with optimized direct populate. - /// Protocol deserializes directly to TIList — no intermediate byte[] or SignalData. + /// GetAllMessageTag - Async callback version. + /// RawBytes path: server pre-serializes → byte[] flows through protocol → PopulateMerge. /// public Task LoadDataSourceAsync(bool clearChangeTracking = true) { @@ -289,16 +296,15 @@ namespace AyCode.Services.Server.SignalRs throw new ArgumentException($"SignalRCrudTags.GetAllMessageTag == SignalRTags.None"); BeginSync(); - return SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()) + return SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()) .ContinueWith(async responseTask => { try { - var response = await responseTask; - if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null) - throw new NullReferenceException($"LoadDataSourceAsync; Status: {response?.Status}"); + var rawBytes = await responseTask + ?? throw new NullReferenceException("LoadDataSourceAsync; null response"); - await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType, + await LoadDataSourceFromResponseData(rawBytes, SerializerType, false, false, clearChangeTracking); } finally diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index d97db0f..b4f362f 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -25,6 +25,11 @@ public abstract class AcWebSignalRHubBase(IConfiguration protected AcSerializerOptions SerializerOptions = new AcBinarySerializerOptions(); + /// + /// Enable diagnostic logging for binary serialization debugging. + /// Set to true to log hex dumps of serialized response data. + /// + public static bool EnableBinaryDiagnostics { get; set; } #region Connection Lifecycle @@ -177,7 +182,7 @@ public abstract class AcWebSignalRHubBase(IConfiguration if (Logger.LogLevel <= LogLevel.Debug) Logger.Debug($"responseData ready ({SerializerOptions.SerializerType})"); - await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId); + await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId, signalParams); return; } @@ -263,8 +268,8 @@ public abstract class AcWebSignalRHubBase(IConfiguration protected virtual Task ResponseToCallerWithContent(int messageTag, object? content) => SendMessageToClient(Clients.Caller, messageTag, SignalResponseStatus.Success, content); - protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) - => SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId); + protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId, SignalParams? clientSignalParams = null) + => SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId, clientSignalParams); protected virtual Task SendMessageToUserIdWithContent(string userId, int messageTag, object? content) => SendMessageToClient(Clients.User(userId), messageTag, SignalResponseStatus.Success, content); @@ -280,15 +285,28 @@ public abstract class AcWebSignalRHubBase(IConfiguration /// /// Sends message to client. Protocol serializes responseData directly to pipe (zero-copy write). + /// clientSignalParams: the original client request's SignalParams (contains IsRawBytesData flag). /// protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, - SignalResponseStatus status, object? responseData, int? requestId = null) + SignalResponseStatus status, object? responseData, int? requestId = null, SignalParams? clientSignalParams = null) { + var isRawBytes = clientSignalParams?.IsRawBytesData == true; + + // IsRawBytesData: client wants raw byte[] — pre-serialize here, protocol passes through as-is. + // Single serialize (here) → single deserialize (consumer). No double ser/deser. + if (isRawBytes && responseData != null && responseData is not byte[]) + { + responseData = SerializerOptions.SerializerType == AcSerializerType.Binary + ? AcBinarySerializer.Serialize(responseData) + : AyCode.Core.Compression.GzipHelper.Compress(responseData.ToJson()); + } + var signalParams = new SignalParams { Status = status, DataSerializerType = SerializerOptions.SerializerType, - SignalDataType = responseData?.GetType().AssemblyQualifiedName + SignalDataType = isRawBytes ? null : responseData?.GetType().AssemblyQualifiedName, + IsRawBytesData = isRawBytes }; var tagName = ConstHelper.NameByValue(messageTag); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index b88474f..ab243be 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -454,30 +454,57 @@ public class AcBinaryHubProtocol : IHubProtocol var argSlice = r.UnreadSequence.Slice(0, argLength); r.Advance(argLength); - // Response data: resolve actual type from SignalDataType for eager deserialization - if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null) + // byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag → + // strip tag + VarUInt length prefix, return raw payload. No deserializer. + var argReader = new SequenceReader(argSlice); + if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray) { - var dataType = Type.GetType(_currentSignalParams.SignalDataType); - if (dataType != null) - targetType = dataType; + argReader.Advance(1); // skip tag + var payloadLength = (int)ReadVarUInt(ref argReader); + return SequenceToByteArray(argReader.UnreadSequence.Slice(0, payloadLength)); + } + + // IsRawBytesData: server serialized the object normally (no byte[] fast-path on write). + // argSlice IS the serialized data. Return it as raw byte[] — no deserialization. + // Consumer (e.g. DataSource.PopulateMerge) deserializes it. + if (_currentSignalParams is { IsRawBytesData: true }) + return SequenceToByteArray(argSlice); + + if (targetType == typeof(object) && _currentSignalParams != null) + { + // Typed response: resolve actual type from SignalDataType for eager deserialization + if (_currentSignalParams.SignalDataType != null) + { + var dataType = Type.GetType(_currentSignalParams.SignalDataType); + if (dataType != null) + targetType = dataType; + } } return DeserializeFromSequence(argSlice, targetType, _options); } /// - /// Deserializes from a ReadOnlySequence, using the pipe's backing byte[] when possible (zero-copy). - /// Only copies for rare multi-segment arguments that span pipe buffer boundaries. + /// Returns raw byte[] from the pipe sequence without any deserialization. + /// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static byte[] SequenceToByteArray(ReadOnlySequence data) + { + if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) + && seg.Offset == 0 && seg.Count == seg.Array!.Length) + return seg.Array; + + return data.ToArray(); + } + + /// + /// Deserializes from a ReadOnlySequence via AcBinaryDeserializer. + /// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private static object? DeserializeFromSequence(ReadOnlySequence data, Type targetType, AcBinarySerializerOptions options) - { - if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)) - return AcBinaryDeserializer.Deserialize(seg.Array!, seg.Offset, (int)data.Length, targetType, options); - - var bytes = data.ToArray(); - return AcBinaryDeserializer.Deserialize(bytes, 0, bytes.Length, targetType, options); - } + => AcBinaryDeserializer.Deserialize(data, targetType, options); #endregion diff --git a/AyCode.Services/SignalRs/AcSignalRClientBase.cs b/AyCode.Services/SignalRs/AcSignalRClientBase.cs index 866b347..4b5dfe0 100644 --- a/AyCode.Services/SignalRs/AcSignalRClientBase.cs +++ b/AyCode.Services/SignalRs/AcSignalRClientBase.cs @@ -21,6 +21,11 @@ namespace AyCode.Services.SignalRs protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; + /// + /// Enable diagnostic logging for binary serialization debugging. + /// + public static bool EnableBinaryDiagnostics { get; set; } + protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, object data); public int MsDelay = 25; @@ -136,7 +141,13 @@ namespace AyCode.Services.SignalRs public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId()); - public virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId) + public virtual Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId) + => SendCoreAsync(messageTag, parameters, requestId, new SignalParams { Status = SignalResponseStatus.Success }); + + /// + /// Core send: takes a pre-built SignalParams (caller controls IsRawBytesData etc.) + /// + protected async Task SendCoreAsync(int messageTag, object[]? parameters, int? requestId, SignalParams signalParams) { Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}"); @@ -148,8 +159,6 @@ namespace AyCode.Services.SignalRs return; } - var signalParams = new SignalParams { Status = SignalResponseStatus.Success }; - if (parameters is { Length: > 0 }) signalParams.SetParameterValues(parameters); @@ -362,7 +371,11 @@ namespace AyCode.Services.SignalRs var requestModel = SignalRRequestModelPool.Get(); _responseByRequestId[requestId] = requestModel; - await SendMessageToServerAsync(messageTag, parameters, requestId); + await SendCoreAsync(messageTag, parameters, requestId, new SignalParams + { + Status = SignalResponseStatus.Success, + IsRawBytesData = typeof(TResponse) == typeof(byte[]) + }); try { diff --git a/AyCode.Services/SignalRs/ISignalParams.cs b/AyCode.Services/SignalRs/ISignalParams.cs index 710e968..32c53a4 100644 --- a/AyCode.Services/SignalRs/ISignalParams.cs +++ b/AyCode.Services/SignalRs/ISignalParams.cs @@ -37,10 +37,17 @@ public class SignalParams : ISignalParams /// /// AssemblyQualifiedName of the response data type. /// Set by server before sending. Protocol uses this to deserialize directly to the target type. - /// null = raw byte[] (populate/merge path). /// public string? SignalDataType { get; set; } + /// + /// Client sets true when requesting raw byte[] (e.g. DataSource populate/merge). + /// Server: reads this from client's SignalParams → serializes object → byte[] directly. + /// Protocol: reads this from response SignalParams → skips deserialization, returns raw byte[]. + /// Result: single serialize (server) → single deserialize (consumer). No double ser/deser. + /// + public bool IsRawBytesData { get; set; } + /// /// Cached deserialized byte[][] from Parameters. ///