From 2d04b9f8f6f1991d8d4c2c5a2277fd4429321350 Mon Sep 17 00:00:00 2001 From: Loretta Date: Mon, 6 Apr 2026 22:45:00 +0200 Subject: [PATCH] Zero-copy SignalR: direct object response, no SignalData Major overhaul for SignalR response pipeline: - All deserialization now uses byte[] (offset/length) for zero-copy, allocation-free operation; all span/memory overloads removed. - SignalR protocol sends (signalParams, object) directly; SignalData envelope and related logic removed. - Server sets SignalParams.SignalDataType so protocol deserializes to the correct runtime type on the client. - SignalResponseDataMessage now only used for client request/response tracking and stream path; RawResponseData holds the actual object. - All extension methods, helpers, and infrastructure updated to use new byte[]-based APIs. - AcSignalRDataSource and all test/benchmark code updated for new object flow. - Removes all diagnostics, logging, and error handling related to binary envelopes. - Enables true zero-copy, type-safe, allocation-free SignalR response handling. --- AyCode.Benchmark/Program.cs | 2 +- AyCode.Benchmark/SerializationBenchmarks.cs | 4 +- .../SignalRRoundTripBenchmarks.cs | 10 +- .../AcBinarySerializerChainTests.cs | 10 +- .../Serialization/QuickBenchmark.cs | 8 +- .../Extensions/SerializeObjectExtensions.cs | 101 +---- ...serializer.BinaryDeserializationContext.cs | 37 +- .../AcBinaryDeserializer.CrossType.cs | 62 +-- .../Binaries/AcBinaryDeserializer.cs | 163 +++---- .../Serializers/Binaries/ArrayBinaryInput.cs | 14 +- .../SignalRs/SignalRTestHelper.cs | 2 +- .../SignalRs/TestableSignalRClient2.cs | 6 +- .../SignalRs/TestableSignalRHub2.cs | 10 +- .../SignalRs/AcSignalRDataSource.cs | 134 ++++-- .../SignalRs/AcSignalRSendToClientService.cs | 11 +- .../SignalRs/AcWebSignalRHubBase.cs | 257 ++--------- .../SignalRs/PostJsonDataMessageTests.cs | 5 +- .../SignalRs/AcBinaryHubProtocol.cs | 407 ++++++++---------- .../SignalRs/AcSignalRClientBase.cs | 127 +----- .../SignalRs/AyCodeBinaryHubProtocol.cs | 16 +- AyCode.Services/SignalRs/IAcSignalRHubBase.cs | 2 +- .../SignalRs/IAcSignalRHubClient.cs | 294 +------------ AyCode.Services/SignalRs/ISignalParams.cs | 7 + AyCode.Services/SignalRs/SignalData.cs | 52 --- .../SignalRs/SignalRSerializationHelper.cs | 9 - 25 files changed, 494 insertions(+), 1256 deletions(-) delete mode 100644 AyCode.Services/SignalRs/SignalData.cs diff --git a/AyCode.Benchmark/Program.cs b/AyCode.Benchmark/Program.cs index 648da8c..ef640de 100644 --- a/AyCode.Benchmark/Program.cs +++ b/AyCode.Benchmark/Program.cs @@ -280,7 +280,7 @@ namespace AyCode.Benchmark for (int i = 0; i < iterations; i++) { var target = CreatePopulateTarget(testOrder); - AcBinaryDeserializer.PopulateMerge(acBinaryNoRef.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(acBinaryNoRef, target); } var acMerge = sw.Elapsed.TotalMilliseconds; results.Add(("Merge", "NoRef", acMerge, 0)); diff --git a/AyCode.Benchmark/SerializationBenchmarks.cs b/AyCode.Benchmark/SerializationBenchmarks.cs index 7fed174..f226491 100644 --- a/AyCode.Benchmark/SerializationBenchmarks.cs +++ b/AyCode.Benchmark/SerializationBenchmarks.cs @@ -380,7 +380,7 @@ public class AcBinaryVsMessagePackFullBenchmark public void PopulateMerge_AcBinary_WithRef() { var target = CreatePopulateTarget(); - AcBinaryDeserializer.PopulateMerge(_acBinaryWithRef.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(_acBinaryWithRef, target); } [Benchmark(Description = "AcBinary PopulateMerge NoRef")] @@ -388,7 +388,7 @@ public class AcBinaryVsMessagePackFullBenchmark { // Create fresh target each time to avoid state accumulation var target = CreatePopulateTarget(); - AcBinaryDeserializer.PopulateMerge(_acBinaryNoRef.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(_acBinaryNoRef, target); } private TestOrder CreatePopulateTarget() diff --git a/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs b/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs index 5845062..66224ab 100644 --- a/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs +++ b/AyCode.Benchmark/SignalRRoundTripBenchmarks.cs @@ -212,16 +212,16 @@ public class BenchmarkSignalRClient : AcSignalRClientBase, IAcSignalRHubItemServ public TResponse? GetAllSync(int tag) => GetAllAsync(tag).GetAwaiter().GetResult(); - protected override Task MessageReceived(int messageTag, SignalParams signalParams, SignalData data) => Task.CompletedTask; + protected override Task MessageReceived(int messageTag, SignalParams signalParams, object data) => Task.CompletedTask; protected override HubConnectionState GetConnectionState() => HubConnectionState.Connected; protected override bool IsConnected() => true; protected override Task StartConnectionInternal() => Task.CompletedTask; protected override Task StopConnectionInternal() => Task.CompletedTask; protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask; - protected override async Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, byte[]? messageBytes) + protected override async Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, object? data) { - await _hub.OnReceiveMessage(messageTag, requestId, signalParams, new SignalData(messageBytes ?? [])); + await _hub.OnReceiveMessage(messageTag, requestId, signalParams, data ?? Array.Empty()); } } @@ -247,8 +247,8 @@ public class BenchmarkSignalRHub : AcWebSignalRHubBase "benchmark-user"; protected override ClaimsPrincipal? GetUser() => null; - protected override Task ResponseToCaller(int messageTag, ISignalRMessage message, int? requestId) - => SendMessageToClient(_callerClient, messageTag, message, requestId); + protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) + => SendMessageToClient(_callerClient, messageTag, status, responseData, requestId); } /// diff --git a/AyCode.Core.Tests/Serialization/AcBinarySerializerChainTests.cs b/AyCode.Core.Tests/Serialization/AcBinarySerializerChainTests.cs index 0547744..f296e3a 100644 --- a/AyCode.Core.Tests/Serialization/AcBinarySerializerChainTests.cs +++ b/AyCode.Core.Tests/Serialization/AcBinarySerializerChainTests.cs @@ -311,15 +311,14 @@ public class AcBinarySerializerChainTests } [TestMethod] - public void DeserializeChain_ReadOnlyMemory_WorksCorrectly() + public void DeserializeChain_ByteArray_WorksCorrectly() { // Arrange var original = new TestSimpleClass { Id = 42, Name = "Memory Test" }; var binary = original.ToBinary(); - ReadOnlyMemory memory = binary; // Act - using var chain = memory.BinaryToChain(); + using var chain = binary.BinaryToChain(); var result = chain.Value; // Assert @@ -329,16 +328,15 @@ public class AcBinarySerializerChainTests } [TestMethod] - public void PopulateChain_ReadOnlyMemory_WorksCorrectly() + public void PopulateChain_ByteArray_WorksCorrectly() { // Arrange var original = new TestSimpleClass { Id = 99, Name = "Memory Update" }; var binary = original.ToBinary(); - ReadOnlyMemory memory = binary; var target = new TestSimpleClass { Id = 1, Name = "Old" }; // Act - using var chain = memory.BinaryToChain(target); + using var chain = binary.BinaryToChain(target); // Assert Assert.AreEqual(99, target.Id); diff --git a/AyCode.Core.Tests/Serialization/QuickBenchmark.cs b/AyCode.Core.Tests/Serialization/QuickBenchmark.cs index e1002ae..489620d 100644 --- a/AyCode.Core.Tests/Serialization/QuickBenchmark.cs +++ b/AyCode.Core.Tests/Serialization/QuickBenchmark.cs @@ -602,7 +602,7 @@ public class QuickBenchmark for (int i = 0; i < DefaultIterations; i++) { var target = CreatePopulateTarget(testOrder); - AcBinaryDeserializer.PopulateMerge(acBinaryNoRef.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(acBinaryNoRef, target); } var acMergeMs = sw.Elapsed.TotalMilliseconds; @@ -744,7 +744,7 @@ public class QuickBenchmark AcBinaryDeserializer.Populate(binaryData, target); //Console.WriteLine("PopulateMerge"); - AcBinaryDeserializer.PopulateMerge(binaryData.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(binaryData, target); } Console.WriteLine($"Iterations: {DefaultIterations:N0}"); @@ -772,7 +772,7 @@ public class QuickBenchmark for (int i = 0; i < DefaultIterations; i++) { var target = CreatePopulateTarget(testOrder); - AcBinaryDeserializer.PopulateMerge(binaryData.AsSpan(), target); + AcBinaryDeserializer.PopulateMerge(binaryData, target); } var mergeMs = sw.Elapsed.TotalMilliseconds; @@ -782,7 +782,7 @@ public class QuickBenchmark for (int i = 0; i < DefaultIterations; i++) { var target = CreatePopulateTarget(testOrder); - AcBinaryDeserializer.PopulateMerge(binaryData.AsSpan(), target, mergeWithRemoveOptions); + AcBinaryDeserializer.PopulateMerge(binaryData, target, mergeWithRemoveOptions); } var mergeWithRemoveMs = sw.Elapsed.TotalMilliseconds; diff --git a/AyCode.Core/Extensions/SerializeObjectExtensions.cs b/AyCode.Core/Extensions/SerializeObjectExtensions.cs index fd8f066..b990eb6 100644 --- a/AyCode.Core/Extensions/SerializeObjectExtensions.cs +++ b/AyCode.Core/Extensions/SerializeObjectExtensions.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Reflection; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Runtime.Serialization; using System.Text; using AyCode.Core.Interfaces; @@ -589,97 +590,37 @@ public static class SerializeObjectExtensions public static T? BinaryTo(this byte[] data) => AcBinaryDeserializer.Deserialize(data); - /// - /// Deserialize binary data from ReadOnlySpan. - /// - public static T? BinaryTo(this ReadOnlySpan data) - => AcBinaryDeserializer.Deserialize(data); - - /// - /// Deserialize binary data from ReadOnlyMemory. - /// - public static T? BinaryTo(this ReadOnlyMemory data) - => AcBinaryDeserializer.Deserialize(data.Span); - /// /// Deserialize binary data to specified type. /// public static object? BinaryTo(this byte[] data, Type targetType) - => AcBinaryDeserializer.Deserialize(data.AsSpan(), targetType); - - /// - /// Deserialize binary data from ReadOnlySpan to specified type. - /// - public static object? BinaryTo(this ReadOnlySpan data, Type targetType) => AcBinaryDeserializer.Deserialize(data, targetType); - /// - /// Deserialize binary data from ReadOnlyMemory to specified type. - /// - public static object? BinaryTo(this ReadOnlyMemory data, Type targetType) - => AcBinaryDeserializer.Deserialize(data.Span, targetType); - /// /// Populate existing object from binary data. /// public static void BinaryTo(this byte[] data, T target) where T : class => AcBinaryDeserializer.Populate(data, target); - /// - /// Populate existing object from binary ReadOnlySpan. - /// - public static void BinaryTo(this ReadOnlySpan data, T target) where T : class - => AcBinaryDeserializer.Populate(data, target); - - /// - /// Populate existing object from binary ReadOnlyMemory. - /// - public static void BinaryTo(this ReadOnlyMemory data, T target) where T : class - => AcBinaryDeserializer.Populate(data.Span, target); - /// /// Populate existing object from binary data with merge semantics for IId collections. /// public static void BinaryToMerge(this byte[] data, T target) where T : class - => AcBinaryDeserializer.PopulateMerge(data.AsSpan(), target); - - /// - /// Populate existing object from binary ReadOnlySpan with merge semantics. - /// - public static void BinaryToMerge(this ReadOnlySpan data, T target) where T : class => AcBinaryDeserializer.PopulateMerge(data, target); - /// - /// Populate existing object from binary ReadOnlyMemory with merge semantics. - /// - public static void BinaryToMerge(this ReadOnlyMemory data, T target) where T : class - => AcBinaryDeserializer.PopulateMerge(data.Span, target); - /// /// Create a deserialize chain that parses binary data once and allows multiple deserializations. /// Efficient for deserializing the same binary to multiple different types. /// Use with 'using' statement or call Dispose() when done. /// public static IDeserializeChain BinaryToChain(this byte[] data) - => AcBinaryDeserializer.CreateDeserializeChain(data.AsSpan()); + => AcBinaryDeserializer.CreateDeserializeChain(data); /// /// Create a deserialize chain with options. /// public static IDeserializeChain BinaryToChain(this byte[] data, AcBinarySerializerOptions options) - => AcBinaryDeserializer.CreateDeserializeChain(data.AsSpan(), options); - - /// - /// Create a deserialize chain from ReadOnlyMemory. - /// - public static IDeserializeChain BinaryToChain(this ReadOnlyMemory data) - => AcBinaryDeserializer.CreateDeserializeChain(data.Span); - - /// - /// Create a deserialize chain from ReadOnlyMemory with options. - /// - public static IDeserializeChain BinaryToChain(this ReadOnlyMemory data, AcBinarySerializerOptions options) - => AcBinaryDeserializer.CreateDeserializeChain(data.Span, options); + => AcBinaryDeserializer.CreateDeserializeChain(data, options); /// /// Create a populate chain that parses binary data once and allows populating multiple objects. @@ -688,7 +629,7 @@ public static class SerializeObjectExtensions /// public static IDeserializeChain BinaryToChain(this byte[] data, T target) where T : class { - var chain = AcBinaryDeserializer.CreateDeserializeChain(data.AsSpan()); + var chain = AcBinaryDeserializer.CreateDeserializeChain(data); chain.ThenPopulate(target); return chain; } @@ -698,27 +639,7 @@ public static class SerializeObjectExtensions /// public static IDeserializeChain BinaryToChain(this byte[] data, T target, AcBinarySerializerOptions options) where T : class { - var chain = AcBinaryDeserializer.CreateDeserializeChain(data.AsSpan(), options); - chain.ThenPopulate(target); - return chain; - } - - /// - /// Create a populate chain from ReadOnlyMemory. - /// - public static IDeserializeChain BinaryToChain(this ReadOnlyMemory data, T target) where T : class - { - var chain = AcBinaryDeserializer.CreateDeserializeChain(data.Span); - chain.ThenPopulate(target); - return chain; - } - - /// - /// Create a populate chain from ReadOnlyMemory with options. - /// - public static IDeserializeChain BinaryToChain(this ReadOnlyMemory data, T target, AcBinarySerializerOptions options) where T : class - { - var chain = AcBinaryDeserializer.CreateDeserializeChain(data.Span, options); + var chain = AcBinaryDeserializer.CreateDeserializeChain(data, options); chain.ThenPopulate(target); return chain; } @@ -734,23 +655,25 @@ public static class SerializeObjectExtensions public static TDestination? CloneTo(this object? src) where TDestination : class { if (src == null) return null; - + var buffer = new ArrayBufferWriter(256); AcBinarySerializer.Serialize(src, buffer, AcBinarySerializerOptions.Default); - return AcBinaryDeserializer.Deserialize(buffer.WrittenSpan); + MemoryMarshal.TryGetArray(buffer.WrittenMemory, out var seg); + return AcBinaryDeserializer.Deserialize(seg.Array!, seg.Offset, seg.Count); } /// /// Copy object properties to target via binary serialization (zero intermediate byte[] allocation). - /// Uses ArrayBufferWriter to serialize directly into a buffer, then populates target from the span. + /// Uses ArrayBufferWriter to serialize directly into a buffer, then populates target from the backing array. /// public static void CopyTo(this object? src, object target) { if (src == null) return; - + var buffer = new ArrayBufferWriter(256); AcBinarySerializer.Serialize(src, buffer, AcBinarySerializerOptions.Default); - AcBinaryDeserializer.Populate(buffer.WrittenSpan, target); + MemoryMarshal.TryGetArray(buffer.WrittenMemory, out var seg); + AcBinaryDeserializer.Populate(seg.Array!, seg.Offset, seg.Count, target); } #endregion diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.BinaryDeserializationContext.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.BinaryDeserializationContext.cs index b8ebbe1..d6cd60b 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.BinaryDeserializationContext.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.BinaryDeserializationContext.cs @@ -96,8 +96,7 @@ public static partial class AcBinaryDeserializer // Intern cache index counter private int _nextCacheIndex; - // Linearized buffer for ReadOnlySequence input - private byte[]? _linearizedBuffer; + // Removed: _linearizedBuffer was used by InitFromSpan (eliminated — all paths now use byte[] directly) /// /// Inline metadata entries flat array. @@ -158,16 +157,7 @@ public static partial class AcBinaryDeserializer ChainTracker = null; } - /// - /// Initializes the context from a ReadOnlySpan by copying to a pooled linearized buffer, - /// then creating an ArrayBinaryInput. Used when TInput is ArrayBinaryInput. - /// - public void InitFromSpan(ReadOnlySpan data) - { - var buffer = RentLinearizedBuffer(data.Length); - data.CopyTo(buffer); - InitInput((TInput)(object)new ArrayBinaryInput(buffer, data.Length)); - } + // Removed: InitFromSpan — all deserialization now goes through byte[] directly (zero-copy). #region Header @@ -292,21 +282,7 @@ public static partial class AcBinaryDeserializer return _stringCache ??= new Dictionary(128); } - /// - /// Rents a linearized buffer for ReadOnlySequence multi-segment input. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal byte[] RentLinearizedBuffer(int minSize) - { - if (_linearizedBuffer != null && _linearizedBuffer.Length >= minSize) - return _linearizedBuffer; - - if (_linearizedBuffer != null) - ArrayPool.Shared.Return(_linearizedBuffer); - - _linearizedBuffer = ArrayPool.Shared.Rent(minSize); - return _linearizedBuffer; - } + // Removed: RentLinearizedBuffer — was only used by InitFromSpan (eliminated). public int[] RentDupData(int minLength) { @@ -482,12 +458,7 @@ public static partial class AcBinaryDeserializer _pooledDupDataLength = 0; } - // Linearized buffer: no GC roots (byte[]), keep small, return large - if (_linearizedBuffer != null && _linearizedBuffer.Length > SmallArrayThreshold) - { - ArrayPool.Shared.Return(_linearizedBuffer); - _linearizedBuffer = null; - } + // Removed: _linearizedBuffer cleanup — field eliminated with InitFromSpan. } #endregion diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.CrossType.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.CrossType.cs index b65a497..6a3179b 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.CrossType.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.CrossType.cs @@ -28,23 +28,19 @@ public static partial class AcBinaryDeserializer /// The destination type to deserialize into /// Binary data to deserialize /// Deserialized instance of TDest - public static TDest? Deserialize(ReadOnlySpan data) + public static TDest? Deserialize(byte[] data) => Deserialize(data, AcBinarySerializerOptions.Default); /// /// Deserializes binary data from TSource type to TDest type with options. /// Supports cross-type mapping with automatic property name matching or custom PropertyMapper. + /// Zero-copy: ArrayBinaryInput references the byte[] directly. /// - /// The source type that was serialized - /// The destination type to deserialize into - /// Binary data to deserialize - /// Deserialization options (use PropertyMapper for custom mapping) - /// Deserialized instance of TDest - public static TDest? Deserialize(ReadOnlySpan data, AcBinarySerializerOptions options) + public static TDest? Deserialize(byte[] data, AcBinarySerializerOptions options) { // Early exit checks - if (DeserializeCrossTypeBase.IsEmptyData(data, BinaryTypeCode.Null)) - return default; + if (data.Length == 0) return default; + if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return default; var sourceType = typeof(TSource); var destType = typeof(TDest); @@ -56,7 +52,7 @@ public static partial class AcBinaryDeserializer // Cross-type path: use index mapping var indexMapping = GetIndexMapping(sourceType, destType, options.PropertyMapper); var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data)); try { @@ -80,20 +76,6 @@ public static partial class AcBinaryDeserializer } } - /// - /// Deserializes binary data from TSource type to TDest type (byte array overload). - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static TDest? Deserialize(byte[] data) - => Deserialize(data.AsSpan()); - - /// - /// Deserializes binary data from TSource type to TDest type with options (byte array overload). - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static TDest? Deserialize(byte[] data, AcBinarySerializerOptions options) - => Deserialize(data.AsSpan(), options); - #endregion #region Cross-Type Populate @@ -106,28 +88,24 @@ public static partial class AcBinaryDeserializer /// The destination type to populate /// Binary data to deserialize /// Existing instance to populate - public static void Populate(ReadOnlySpan data, TDest target) + public static void Populate(byte[] data, TDest target) where TDest : class => Populate(data, target, AcBinarySerializerOptions.Default); /// /// Populates existing TDest instance with data serialized as TSource with options. /// Supports cross-type mapping with automatic property name matching or custom PropertyMapper. + /// Zero-copy: ArrayBinaryInput references the byte[] directly. /// - /// The source type that was serialized - /// The destination type to populate - /// Binary data to deserialize - /// Existing instance to populate - /// Deserialization options (use PropertyMapper for custom mapping) - public static void Populate(ReadOnlySpan data, TDest target, AcBinarySerializerOptions options) + public static void Populate(byte[] data, TDest target, AcBinarySerializerOptions options) where TDest : class { // Validation DeserializeCrossTypeBase.ValidatePopulateTarget(target); // Early exit checks - if (DeserializeCrossTypeBase.IsEmptyData(data, BinaryTypeCode.Null)) - return; + if (data.Length == 0) return; + if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return; var sourceType = typeof(TSource); var destType = typeof(TDest); @@ -142,7 +120,7 @@ public static partial class AcBinaryDeserializer // Cross-type path: use index mapping var indexMapping = GetIndexMapping(sourceType, destType, options.PropertyMapper); var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data)); try { @@ -183,22 +161,6 @@ public static partial class AcBinaryDeserializer } } - /// - /// Populates existing TDest instance with data serialized as TSource (byte array overload). - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void Populate(byte[] data, TDest target) - where TDest : class - => Populate(data.AsSpan(), target); - - /// - /// Populates existing TDest instance with data serialized as TSource with options (byte array overload). - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void Populate(byte[] data, TDest target, AcBinarySerializerOptions options) - where TDest : class - => Populate(data.AsSpan(), target, options); - #endregion #region Helper Methods diff --git a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs index fe5ff4e..5186ec9 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinaryDeserializer.cs @@ -180,24 +180,28 @@ public static partial class AcBinaryDeserializer } /// - /// Deserialize binary data to object of type T. + /// Deserialize binary data to object of type T from a sub-range of a byte[]. + /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// - public static T? Deserialize(ReadOnlySpan data) => Deserialize(data, AcBinarySerializerOptions.Default); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static T? Deserialize(byte[] data, int offset, int length) + => Deserialize(data, offset, length, AcBinarySerializerOptions.Default); /// - /// Deserialize binary data to object of type T with options. + /// Deserialize binary data to object of type T from a sub-range with options. + /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// - public static T? Deserialize(ReadOnlySpan data, AcBinarySerializerOptions options) + public static T? Deserialize(byte[] data, int offset, int length, AcBinarySerializerOptions options) { - if (data.Length == 0) return default; - if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return default; + if (length == 0) return default; + if (length == 1 && data[offset] == BinaryTypeCode.Null) return default; var targetType = typeof(T); if (AcSerializerCommon.IsExpressionType(targetType)) - return (T?)(object?)DeserializeExpression(data, targetType, options); + return (T?)(object?)DeserializeExpression(data, offset, length, targetType, options); var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data, offset, length)); try { return (T?)DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -205,22 +209,35 @@ public static partial class AcBinaryDeserializer /// /// Deserialize binary data to specified type. /// - public static object? Deserialize(ReadOnlySpan data, Type targetType) - => Deserialize(data, targetType, AcBinarySerializerOptions.Default); + public static object? Deserialize(byte[] data, Type targetType) + => Deserialize(data, 0, data.Length, targetType, AcBinarySerializerOptions.Default); /// /// Deserialize binary data to specified type with options. /// - public static object? Deserialize(ReadOnlySpan data, Type targetType, AcBinarySerializerOptions options) + public static object? Deserialize(byte[] data, Type targetType, AcBinarySerializerOptions options) + => Deserialize(data, 0, data.Length, targetType, options); + + /// + /// Deserialize binary data to specified type from a sub-range. + /// + public static object? Deserialize(byte[] data, int offset, int length, Type targetType) + => Deserialize(data, offset, length, targetType, AcBinarySerializerOptions.Default); + + /// + /// Deserialize binary data to specified type from a sub-range with options. + /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. + /// + public static object? Deserialize(byte[] data, int offset, int length, Type targetType, AcBinarySerializerOptions options) { - if (data.Length == 0) return null; - if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return null; + if (length == 0) return null; + if (length == 1 && data[offset] == BinaryTypeCode.Null) return null; if (AcSerializerCommon.IsExpressionType(targetType)) - return DeserializeExpression(data, targetType, options); + return DeserializeExpression(data, offset, length, targetType, options); var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data, offset, length)); try { return DeserializeCore(context, targetType); } finally { DeserializationContextPool.Return(context); } } @@ -240,8 +257,8 @@ public static partial class AcBinaryDeserializer { if (data.Length == 0) return default; - if (data.IsSingleSegment) - return Deserialize(data.FirstSpan, options); + if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)) + return Deserialize(seg.Array!, seg.Offset, seg.Count, options); return DeserializeSequence(new SequenceBinaryInput(data), typeof(T), options); } @@ -259,8 +276,8 @@ public static partial class AcBinaryDeserializer { if (data.Length == 0) return null; - if (data.IsSingleSegment) - return Deserialize(data.FirstSpan, targetType, options); + if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg2)) + return Deserialize(seg2.Array!, seg2.Offset, seg2.Count, targetType, options); return DeserializeSequence(new SequenceBinaryInput(data), targetType, options); } @@ -272,7 +289,10 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { if (AcSerializerCommon.IsExpressionType(targetType)) - return Deserialize(LinearizeSequence(input), options); + { + var (buf, off, len) = LinearizeSequence(input); + return Deserialize(buf, off, len, options); + } var context = DeserializationContextPool.Get(options); context.InitInput(input); @@ -287,7 +307,10 @@ public static partial class AcBinaryDeserializer where TInput : struct, IBinaryInputBase { if (AcSerializerCommon.IsExpressionType(targetType)) - return Deserialize(LinearizeSequence(input), targetType, options); + { + var (buf, off, len) = LinearizeSequence(input); + return Deserialize(buf, off, len, targetType, options); + } var context = DeserializationContextPool.Get(options); context.InitInput(input); @@ -296,13 +319,13 @@ public static partial class AcBinaryDeserializer } /// - /// Fallback: linearize a TInput into a contiguous ReadOnlySpan (for Expression deserialization). + /// Fallback: linearize a TInput into a contiguous byte[] range (for Expression deserialization). /// - private static ReadOnlySpan LinearizeSequence(TInput input) + private static (byte[] buffer, int offset, int length) LinearizeSequence(TInput input) where TInput : struct, IBinaryInputBase { input.Initialize(out var buffer, out var position, out var bufferLength); - return buffer.AsSpan(position, bufferLength - position); + return (buffer, position, bufferLength - position); } /// @@ -329,10 +352,16 @@ public static partial class AcBinaryDeserializer /// /// Deserialize Expression from binary data. /// - private static Expression? DeserializeExpression(ReadOnlySpan data, Type targetExpressionType, AcBinarySerializerOptions options) + private static Expression? DeserializeExpression(byte[] data, Type targetExpressionType, AcBinarySerializerOptions options) + => DeserializeExpression(data, 0, data.Length, targetExpressionType, options); + + /// + /// Deserialize Expression from binary data sub-range. + /// + private static Expression? DeserializeExpression(byte[] data, int offset, int length, Type targetExpressionType, AcBinarySerializerOptions options) { var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data, offset, length)); try { var node = (AcExpressionNode?)DeserializeCore(context, typeof(AcExpressionNode)); @@ -437,25 +466,27 @@ public static partial class AcBinaryDeserializer /// Populate existing object from binary data with options. /// public static void Populate(byte[] data, T target, AcBinarySerializerOptions options) where T : class - => Populate(data.AsSpan(), target, options); + => Populate(data, 0, data.Length, target, options); /// - /// Populate existing object from binary data. + /// Populate existing object from binary data sub-range. /// - public static void Populate(ReadOnlySpan data, T target) where T : class - => Populate(data, target, AcBinarySerializerOptions.Default); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void Populate(byte[] data, int offset, int length, T target) where T : class + => Populate(data, offset, length, target, AcBinarySerializerOptions.Default); /// - /// Populate existing object from binary data with options. + /// Populate existing object from binary data sub-range with options. + /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. /// - public static void Populate(ReadOnlySpan data, T target, AcBinarySerializerOptions options) where T : class + public static void Populate(byte[] data, int offset, int length, T target, AcBinarySerializerOptions options) where T : class { ArgumentNullException.ThrowIfNull(target); - if (data.Length == 0) return; - if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return; + if (length == 0) return; + if (length == 1 && data[offset] == BinaryTypeCode.Null) return; var context = DeserializationContextPool.Get(options); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data, offset, length)); try { PopulateCore(context, target); } finally { DeserializationContextPool.Return(context); } } @@ -464,46 +495,35 @@ public static partial class AcBinaryDeserializer /// Populate with merge semantics for IId collections from byte[] (zero-copy). /// public static void PopulateMerge(byte[] data, T target) where T : class - { - ArgumentNullException.ThrowIfNull(target); - if (data.Length == 0) return; - if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return; - - var context = DeserializationContextPool.Get(AcBinarySerializerOptions.Default); - context.InitInput(new ArrayBinaryInput(data)); - context.IsMergeMode = true; - context.RemoveOrphanedItems = AcBinarySerializerOptions.Default.RemoveOrphanedItems; - try - { - PopulateMergeCore(context, target); - } - finally - { - DeserializationContextPool.Return(context); - } - } - /// - /// Populate with merge semantics for IId collections. - /// - public static void PopulateMerge(ReadOnlySpan data, T target) where T : class - => PopulateMerge(data, target, AcBinarySerializerOptions.Default); + => PopulateMerge(data, 0, data.Length, target, AcBinarySerializerOptions.Default); /// - /// Populate with merge semantics for IId collections. + /// Populate with merge semantics for IId collections from byte[] with options (zero-copy). /// - /// Binary data to deserialize - /// Target object to populate - /// Optional serializer options. When RemoveOrphanedItems is true, - /// items in destination collections that have no matching Id in source will be removed. - public static void PopulateMerge(ReadOnlySpan data, T target, AcBinarySerializerOptions? options) where T : class + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void PopulateMerge(byte[] data, T target, AcBinarySerializerOptions options) where T : class + => PopulateMerge(data, 0, data.Length, target, options); + + /// + /// Populate with merge semantics for IId collections from a sub-range of byte[]. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void PopulateMerge(byte[] data, int offset, int length, T target) where T : class + => PopulateMerge(data, offset, length, target, AcBinarySerializerOptions.Default); + + /// + /// Populate with merge semantics for IId collections from a sub-range with options. + /// Zero-copy: ArrayBinaryInput references the byte[] directly with offset. + /// + public static void PopulateMerge(byte[] data, int offset, int length, T target, AcBinarySerializerOptions? options) where T : class { ArgumentNullException.ThrowIfNull(target); - if (data.Length == 0) return; - if (data.Length == 1 && data[0] == BinaryTypeCode.Null) return; + if (length == 0) return; + if (length == 1 && data[offset] == BinaryTypeCode.Null) return; var opts = options ?? AcBinarySerializerOptions.Default; var context = DeserializationContextPool.Get(opts); - context.InitFromSpan(data); + context.InitInput(new ArrayBinaryInput(data, offset, length)); context.IsMergeMode = true; context.RemoveOrphanedItems = opts.RemoveOrphanedItems; @@ -606,27 +626,26 @@ public static partial class AcBinaryDeserializer /// Create a deserialize chain that parses binary data once and allows multiple deserializations. /// Maintains reference identity for IId objects across chain operations. /// - public static IDeserializeChain CreateDeserializeChain(ReadOnlySpan data) + public static IDeserializeChain CreateDeserializeChain(byte[] data) => CreateDeserializeChain(data, AcBinarySerializerOptions.Default); /// /// Create a deserialize chain with options. /// - public static IDeserializeChain CreateDeserializeChain(ReadOnlySpan data, AcBinarySerializerOptions options) + public static IDeserializeChain CreateDeserializeChain(byte[] data, AcBinarySerializerOptions options) { if (data.Length == 0 || (data.Length == 1 && data[0] == BinaryTypeCode.Null)) return EmptyDeserializeChain.Instance; - var dataArray = data.ToArray(); var chainTracker = new AcSerializerCommon.ChainReferenceTracker(); var context = DeserializationContextPool.Get(options); - context.InitInput(new ArrayBinaryInput(dataArray)); + context.InitInput(new ArrayBinaryInput(data)); context.ChainTracker = chainTracker; try { var result = (T?)DeserializeCore(context, typeof(T)); - return new BinaryDeserializeChain(dataArray, options, chainTracker, result); + return new BinaryDeserializeChain(data, options, chainTracker, result); } finally { diff --git a/AyCode.Core/Serializers/Binaries/ArrayBinaryInput.cs b/AyCode.Core/Serializers/Binaries/ArrayBinaryInput.cs index c7e2014..8514427 100644 --- a/AyCode.Core/Serializers/Binaries/ArrayBinaryInput.cs +++ b/AyCode.Core/Serializers/Binaries/ArrayBinaryInput.cs @@ -13,27 +13,33 @@ namespace AyCode.Core.Serializers.Binaries; public struct ArrayBinaryInput : IBinaryInputBase { private readonly byte[] _data; + private readonly int _offset; private readonly int _length; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ArrayBinaryInput(byte[] data, int length) + public ArrayBinaryInput(byte[] data, int offset, int length) { _data = data; + _offset = offset; _length = length; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ArrayBinaryInput(byte[] data) : this(data, data.Length) { } + public ArrayBinaryInput(byte[] data, int length) : this(data, 0, length) { } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ArrayBinaryInput(byte[] data) : this(data, 0, data.Length) { } /// /// Provides the buffer directly — zero copy. + /// Position starts at offset, bufferLength = offset + length. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Initialize(out byte[] buffer, out int position, out int bufferLength) { buffer = _data; - position = 0; - bufferLength = _length; + position = _offset; + bufferLength = _offset + _length; } /// diff --git a/AyCode.Services.Server.Tests/SignalRs/SignalRTestHelper.cs b/AyCode.Services.Server.Tests/SignalRs/SignalRTestHelper.cs index 8df7ad9..9d6fafc 100644 --- a/AyCode.Services.Server.Tests/SignalRs/SignalRTestHelper.cs +++ b/AyCode.Services.Server.Tests/SignalRs/SignalRTestHelper.cs @@ -44,7 +44,7 @@ public static class SignalRTestHelper public static T? GetResponseData(SentMessage sentMessage) { - if (sentMessage.Message is SignalResponseDataMessage dataResponse && dataResponse.ResponseData != null) + if (sentMessage.Message is SignalResponseDataMessage dataResponse && dataResponse.RawResponseData != null) return dataResponse.GetResponseData(); return default; diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs index 286e720..2fdcbf1 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRClient2.cs @@ -29,7 +29,7 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ #region Override virtual methods for testing - protected override async Task MessageReceived(int messageTag, SignalParams signalParams, SignalData data) + protected override Task MessageReceived(int messageTag, SignalParams signalParams, object data) { throw new NotImplementedException(); } @@ -52,9 +52,9 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask; - protected override async Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, byte[]? messageBytes) + protected override Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, object? data) { - await _signalRHub.OnReceiveMessage(messageTag, requestId, signalParams, new SignalData(messageBytes ?? [])); + return _signalRHub.OnReceiveMessage(messageTag, requestId, signalParams, data ?? Array.Empty()); } #endregion diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs index 8346b29..3aa4b3b 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs @@ -1,6 +1,4 @@ using System.Security.Claims; -using AyCode.Core.Extensions; -using AyCode.Core.Helpers; using AyCode.Core.Serializers; using AyCode.Core.Tests.TestModels; using AyCode.Models.Server.DynamicMethods; @@ -13,6 +11,8 @@ namespace AyCode.Services.Server.Tests.SignalRs; /// /// Testable SignalR hub that overrides infrastructure dependencies. /// Enables unit testing without SignalR server or mocks. +/// Uses base SendMessageToClient which sends raw objects directly. +/// GetResponseData<T>() handles deserialization with 3-tier fallback. /// public class TestableSignalRHub2 : AcWebSignalRHubBase { @@ -85,10 +85,10 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase SendMessageToClient(_callerClient, messageTag, message, requestId); + protected override Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) + => SendMessageToClient(_callerClient, messageTag, status, responseData, requestId); #endregion } diff --git a/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs b/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs index 66ba1a2..86601eb 100644 --- a/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs +++ b/AyCode.Services.Server/SignalRs/AcSignalRDataSource.cs @@ -9,6 +9,7 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using AyCode.Core.Compression; using AyCode.Core.Serializers; +using AyCode.Core.Serializers.Binaries; namespace AyCode.Services.Server.SignalRs { @@ -265,10 +266,12 @@ namespace AyCode.Services.Server.SignalRs BeginSync(); try { - var responseData = (await SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams())) - ?? throw new NullReferenceException(); + var response = await SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()); + if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null) + throw new NullReferenceException($"LoadDataSource; Status: {response?.Status}"); - await LoadDataSource(responseData, false, false, clearChangeTracking); + await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType, + false, false, clearChangeTracking); } finally { @@ -278,25 +281,24 @@ namespace AyCode.Services.Server.SignalRs /// /// GetAllMessageTag - Async callback version with optimized direct populate. - /// Uses SignalResponseDataMessage to avoid double deserialization. + /// Protocol deserializes directly to TIList — no intermediate byte[] or SignalData. /// public Task LoadDataSourceAsync(bool clearChangeTracking = true) { - if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None) + if (SignalRCrudTags.GetAllMessageTag == AcSignalRTags.None) throw new ArgumentException($"SignalRCrudTags.GetAllMessageTag == SignalRTags.None"); BeginSync(); - // Request SignalResponseDataMessage directly to avoid deserializing ResponseData return SignalRClient.GetAllAsync(SignalRCrudTags.GetAllMessageTag, GetContextParams()) .ContinueWith(async responseTask => { try { var response = await responseTask; - if (response?.Status != SignalResponseStatus.Success || response.ResponseData == null) + if (response?.Status != SignalResponseStatus.Success || response.RawResponseData == null) throw new NullReferenceException($"LoadDataSourceAsync; Status: {response?.Status}"); - await LoadDataSourceFromResponseData(response.ResponseData, response.DataSerializerType, + await LoadDataSourceFromResponseData(response.RawResponseData, response.DataSerializerType, false, false, clearChangeTracking); } finally @@ -307,25 +309,79 @@ namespace AyCode.Services.Server.SignalRs } /// - /// Loads data source directly from ResponseData byte[], avoiding double deserialization. + /// Loads data source from response data. + /// responseData is either a typed object (protocol deserialized) or byte[] (raw path). /// - public async Task LoadDataSourceFromResponseData(SignalData responseData, AcSerializerType serializerType, + public async Task LoadDataSourceFromResponseData(object responseData, AcSerializerType serializerType, bool refreshDataFromDbAsync = false, bool setSourceToWorkingReferenceList = false, bool clearChangeTracking = true) { await _asyncLock.WaitAsync(); try { - if (!setSourceToWorkingReferenceList) + if (responseData is byte[] rawBytes) { - // Direct populate into existing InnerList - if (serializerType == AcSerializerType.Binary) + // Raw byte[] path — populate from binary bytes + if (!setSourceToWorkingReferenceList) + { + if (serializerType == AcSerializerType.Binary) + { + if (InnerList is IAcObservableCollection observable) + { + observable.BeginUpdate(); + try + { + AcBinaryDeserializer.PopulateMerge(rawBytes, 0, rawBytes.Length, InnerList); + } + finally + { + observable.EndUpdate(); + } + } + else + { + AcBinaryDeserializer.Populate(rawBytes, 0, rawBytes.Length, InnerList); + } + } + else + { + var json = GzipHelper.DecompressToString(rawBytes); + if (InnerList is IAcObservableCollection observable) + { + observable.PopulateFromJson(json); + } + else + { + json.JsonTo(InnerList); + } + } + } + else + { + TIList? fromSource; + if (serializerType == AcSerializerType.Binary) + fromSource = AcBinaryDeserializer.Deserialize(rawBytes, 0, rawBytes.Length); + else + fromSource = GzipHelper.DecompressToString(rawBytes).JsonTo(); + + if (fromSource != null) + { + ClearUnsafe(clearChangeTracking); + SetWorkingReferenceListUnsafe(fromSource); + } + } + } + else if (responseData is TIList typedList) + { + // Typed object path — protocol already deserialized to target type + if (!setSourceToWorkingReferenceList) { if (InnerList is IAcObservableCollection observable) { observable.BeginUpdate(); try { - responseData.Span.BinaryToMerge(InnerList); + InnerList.Clear(); + foreach (var item in typedList) InnerList.Add(item); } finally { @@ -334,36 +390,42 @@ namespace AyCode.Services.Server.SignalRs } else { - responseData.Span.BinaryTo(InnerList); + InnerList.Clear(); + foreach (var item in typedList) InnerList.Add(item); } } else { - // JSON mode - decompress GZip first (no span overload for DecompressToString) - var json = GzipHelper.DecompressToString(responseData.ToArray()); - if (InnerList is IAcObservableCollection observable) - { - observable.PopulateFromJson(json); - } - else - { - json.JsonTo(InnerList); - } + ClearUnsafe(clearChangeTracking); + SetWorkingReferenceListUnsafe(typedList); } } else { - // Deserialize to new list and set as reference - TIList? fromSource; - if (serializerType == AcSerializerType.Binary) - fromSource = responseData.Span.BinaryTo(); - else - fromSource = GzipHelper.DecompressToString(responseData.ToArray()).JsonTo(); - - if (fromSource != null) + // Fallback: incompatible collection type (e.g., List in test scenarios without protocol). + // Re-serialize to byte[] then process inline. + var reBytes = AcBinarySerializer.Serialize(responseData); + if (!setSourceToWorkingReferenceList) { - ClearUnsafe(clearChangeTracking); - SetWorkingReferenceListUnsafe(fromSource); + if (InnerList is IAcObservableCollection observable2) + { + observable2.BeginUpdate(); + try { AcBinaryDeserializer.PopulateMerge(reBytes, 0, reBytes.Length, InnerList); } + finally { observable2.EndUpdate(); } + } + else + { + AcBinaryDeserializer.Populate(reBytes, 0, reBytes.Length, InnerList); + } + } + else + { + var fromSource = AcBinaryDeserializer.Deserialize(reBytes, 0, reBytes.Length); + if (fromSource != null) + { + ClearUnsafe(clearChangeTracking); + SetWorkingReferenceListUnsafe(fromSource); + } } } @@ -962,7 +1024,7 @@ namespace AyCode.Services.Server.SignalRs return SignalRClient.PostDataAsync(messageTag, item, response => { - if (response.Status != SignalResponseStatus.Success || response.ResponseData == null) + if (response.Status != SignalResponseStatus.Success || response.RawResponseData == null) { if (TryRollbackItem(item.Id, out _)) return; throw new NullReferenceException($"SaveItemUnsafeAsync; Status: {response.Status}"); diff --git a/AyCode.Services.Server/SignalRs/AcSignalRSendToClientService.cs b/AyCode.Services.Server/SignalRs/AcSignalRSendToClientService.cs index 47a63ad..cce7f7e 100644 --- a/AyCode.Services.Server/SignalRs/AcSignalRSendToClientService.cs +++ b/AyCode.Services.Server/SignalRs/AcSignalRSendToClientService.cs @@ -1,8 +1,6 @@ using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; -using AyCode.Core.Serializers.Binaries; -using AyCode.Core.Serializers.Jsons; using AyCode.Services.SignalRs; using Microsoft.AspNetCore.SignalR; @@ -15,16 +13,15 @@ public abstract class AcSignalRSendToClientService(messageTag)}"); - await sendTo.OnReceiveMessage(messageTag, null, signalParams, responseData); + Logger.Info($"Server sending to client; {ConstHelper.NameByValue(messageTag)}"); + await sendTo.OnReceiveMessage(messageTag, null, signalParams, content!); } public virtual Task SendMessageToAllClients(int messageTag, object? content) diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index 9e0eec5..d97db0f 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -1,12 +1,10 @@ -using System.Buffers; -using System.Security.Claims; +using System.Security.Claims; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Core.Serializers; using AyCode.Core.Serializers.Binaries; -using AyCode.Core.Serializers.Jsons; using AyCode.Models.Server.DynamicMethods; using AyCode.Services.SignalRs; using Microsoft.AspNetCore.SignalR; @@ -27,20 +25,11 @@ public abstract class AcWebSignalRHubBase(IConfiguration protected AcSerializerOptions SerializerOptions = new AcBinarySerializerOptions(); - /// - /// Enable diagnostic logging for binary serialization debugging. - /// Set to true to log hex dumps of serialized response data. - /// - public static bool EnableBinaryDiagnostics { get; set; } = false; #region Connection Lifecycle public override async Task OnConnectedAsync() { - // Enable protocol diagnostics to debug deserialization issues - if (EnableBinaryDiagnostics) - AcBinaryHubProtocol.DiagnosticLogger ??= msg => Logger.Info(msg); - Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}"); LogContextUserNameAndId(); await base.OnConnectedAsync(); @@ -64,7 +53,7 @@ public abstract class AcWebSignalRHubBase(IConfiguration #region Message Processing - public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, SignalData data) + public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data) { return ProcessOnReceiveMessage(messageTag, signalParams, requestId, null); } @@ -111,7 +100,7 @@ public abstract class AcWebSignalRHubBase(IConfiguration else { Logger.Warning($"Method '{tagName}' does not return IAsyncEnumerable. Returning normal message as single chunk."); - var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData); + var responseMessage = CreateStreamResponseMessage(messageTag, SignalResponseStatus.Success, responseData); yield return SignalRSerializationHelper.SerializeToBinary(responseMessage); } } @@ -153,7 +142,7 @@ public abstract class AcWebSignalRHubBase(IConfiguration } else { - var msg = CreateResponseMessage(messageTag, SignalResponseStatus.Success, item); + var msg = CreateStreamResponseMessage(messageTag, SignalResponseStatus.Success, item); yield return SignalRSerializationHelper.SerializeToBinary(msg); } } @@ -185,24 +174,13 @@ public abstract class AcWebSignalRHubBase(IConfiguration if (TryFindAndInvokeMethod(messageTag, signalParams, tagName, out var responseData)) { - var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData); - if (Logger.LogLevel <= LogLevel.Debug) - { - var responseSize = GetResponseSize(responseMessage); - Logger.Debug($"[{responseSize / 1024}kb] responseData serialized ({SerializerOptions.SerializerType})"); - } + Logger.Debug($"responseData ready ({SerializerOptions.SerializerType})"); - // Log binary diagnostics if enabled - if (EnableBinaryDiagnostics && responseMessage is SignalResponseDataMessage dataMsg && dataMsg.ResponseData is { IsEmpty: false }) - { - LogResponseDataDiagnostics(messageTag, tagName, requestId, dataMsg.ResponseData.ToArray()); - } - - await ResponseToCaller(messageTag, responseMessage, requestId); + await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId); return; } - + Logger.Warning($"Not found dynamic method for the tag! {tagName}"); notFoundCallback?.Invoke(tagName); } @@ -211,183 +189,15 @@ public abstract class AcWebSignalRHubBase(IConfiguration Logger.Error($"Server OnReceiveMessage; {ex.Message}; {tagName}", ex); } - await ResponseToCaller(messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Error, null), requestId); + await ResponseToCaller(messageTag, SignalResponseStatus.Error, null, requestId); } /// - /// Reads a VarUInt from byte array at given position. + /// Creates a SignalResponseDataMessage for stream path (serialized as wire format blob). + /// Main send path uses SendMessageToClient directly — no wrapper needed. /// - private static (uint value, int bytesRead) ReadVarUINTFromBytes(byte[] data, int startPos) - { - uint value = 0; - var shift = 0; - var bytesRead = 0; - - while (startPos + bytesRead < data.Length) - { - var b = data[startPos + bytesRead]; - bytesRead++; - value |= (uint)(b & 0x7F) << shift; - if ((b & 0x80) == 0) - break; - shift += 7; - if (shift > 35) - break; - } - - return (value, bytesRead); - } - - /// - /// Logs type information about the response data before serialization. - /// - private void LogResponseDataTypeInfo(object responseData) - { - try - { - var type = responseData.GetType(); - Logger.Info($"=== SERVER RESPONSE TYPE INFO (BEFORE SERIALIZE) ==="); - Logger.Info($"Runtime Type: {type.Name}"); - Logger.Info($"FullName: {type.FullName}"); - Logger.Info($"Namespace: {type.Namespace}"); - Logger.Info($"Assembly: {type.Assembly.GetName().Name} v{type.Assembly.GetName().Version}"); - Logger.Info($"AssemblyQualifiedName: {type.AssemblyQualifiedName}"); - Logger.Info($"Assembly Location: {type.Assembly.Location}"); - - // For collections, log element type info - if (type.IsGenericType) - { - var genericArgs = type.GetGenericArguments(); - Logger.Info($"Generic Arguments: [{string.Join(", ", genericArgs.Select(t => t.FullName))}]"); - - if (genericArgs.Length == 1) - { - var elementType = genericArgs[0]; - Logger.Info($"--- ELEMENT TYPE INFO ---"); - Logger.Info($"Element Type: {elementType.Name}"); - Logger.Info($"Element FullName: {elementType.FullName}"); - Logger.Info($"Element Namespace: {elementType.Namespace}"); - Logger.Info($"Element Assembly: {elementType.Assembly.GetName().Name} v{elementType.Assembly.GetName().Version}"); - Logger.Info($"Element AssemblyQualifiedName: {elementType.AssemblyQualifiedName}"); - Logger.Info($"Element Assembly Location: {elementType.Assembly.Location}"); - Logger.Info($"Element BaseType: {elementType.BaseType?.FullName ?? "null"}"); - - // Log inheritance chain - var baseType = elementType.BaseType; - var inheritanceChain = new List(); - while (baseType != null && baseType != typeof(object)) - { - inheritanceChain.Add($"{baseType.Name} ({baseType.Assembly.GetName().Name})"); - baseType = baseType.BaseType; - } - if (inheritanceChain.Count > 0) - { - Logger.Info($"Element Inheritance: {string.Join(" -> ", inheritanceChain)}"); - } - - LogTypePropertiesServer(elementType, "Element"); - } - } - else - { - Logger.Info($"BaseType: {type.BaseType?.FullName ?? "null"}"); - LogTypePropertiesServer(type, "Response"); - } - } - catch (Exception ex) - { - Logger.Warning($"Failed to log response type info: {ex.Message}"); - } - } - - /// - /// Logs all properties of a type with their declaring types. - /// - private void LogTypePropertiesServer(Type type, string prefix) - { - var props = type.GetProperties(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance) - .Where(p => p.CanRead && p.GetIndexParameters().Length == 0) - .ToArray(); - - // Log in declaration order (not alphabetically) - Logger.Info($"{prefix} Property Count: {props.Length}"); - for (var i = 0; i < props.Length; i++) - { - var p = props[i]; - var declaringType = p.DeclaringType?.Name ?? "?"; - var declaringAssembly = p.DeclaringType?.Assembly.GetName().Name ?? "?"; - Logger.Info($" {prefix}[{i}]: {p.Name} : {p.PropertyType.Name} (declared in {declaringType} @ {declaringAssembly})"); - } - } - - /// - /// Logs diagnostic information about the ResponseData binary for debugging serialization issues. - /// - private void LogResponseDataDiagnostics(int messageTag, string tagName, int? requestId, byte[] responseData) - { - try - { - Logger.Info($"=== SERVER RESPONSE DATA DIAGNOSTICS (AFTER SERIALIZE) ==="); - Logger.Info($"Tag: {messageTag} ({tagName}); RequestId: {requestId}; ResponseData.Length: {responseData.Length}"); - Logger.Info($"HEX (first 500 bytes): {Convert.ToHexString(responseData.AsSpan(0, Math.Min(500, responseData.Length)))}"); - - if (responseData.Length >= 3) - { - var version = responseData[0]; - var marker = responseData[1]; - Logger.Info($"Version: {version}; Marker: 0x{marker:X2}"); - - if ((marker & 0x10) != 0) - { - // Read property count as VarUInt - var pos = 2; - var (propCount, bytesRead) = ReadVarUINTFromBytes(responseData, pos); - pos += bytesRead; - - Logger.Info($"Header property count: {propCount}"); - - for (var i = 0; i < (int)propCount && pos < responseData.Length; i++) - { - // Read string length as VarUInt - var (strLen, strLenBytes) = ReadVarUINTFromBytes(responseData, pos); - pos += strLenBytes; - - if (pos + (int)strLen <= responseData.Length) - { - var propName = System.Text.Encoding.UTF8.GetString(responseData, pos, (int)strLen); - pos += (int)strLen; - Logger.Info($" Header[{i}]: '{propName}'"); - } - else - { - Logger.Info($" Header[{i}]: "); - break; - } - } - } - } - } - catch (Exception ex) - { - Logger.Warning($"Failed to log response data diagnostics: {ex.Message}"); - } - } - - /// - /// Creates a response message using the configured serializer. - /// - protected virtual ISignalRMessage CreateResponseMessage(int messageTag, SignalResponseStatus status, object? responseData) - { - return new SignalResponseDataMessage(messageTag, status, responseData, SerializerOptions); - } - - /// - /// Gets the size of the response data for logging purposes. - /// - private static int GetResponseSize(ISignalRMessage responseMessage) - { - return responseMessage is SignalResponseDataMessage dataMsg ? dataMsg.ResponseData?.Length ?? 0 : 0; - } + protected SignalResponseDataMessage CreateStreamResponseMessage(int messageTag, SignalResponseStatus status, object? responseData) + => new(messageTag, status, responseData, SerializerOptions); /// /// Finds and invokes the method registered for the given message tag. @@ -410,11 +220,6 @@ public abstract class AcWebSignalRHubBase(IConfiguration responseData = methodInfoModel.MethodInfo.InvokeMethod(instance, paramValues); - if (EnableBinaryDiagnostics && responseData != null) - { - LogResponseDataTypeInfo(responseData); - } - if (methodInfoModel.Attribute.SendToOtherClientType != SendToClientType.None) SendMessageToOthers(methodInfoModel.Attribute.SendToOtherClientTag, responseData).Forget(); @@ -456,47 +261,41 @@ public abstract class AcWebSignalRHubBase(IConfiguration #region Response Methods protected virtual Task ResponseToCallerWithContent(int messageTag, object? content) - => ResponseToCaller(messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null); + => SendMessageToClient(Clients.Caller, messageTag, SignalResponseStatus.Success, content); - protected virtual Task ResponseToCaller(int messageTag, ISignalRMessage message, int? requestId) - => SendMessageToClient(Clients.Caller, messageTag, message, requestId); + protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) + => SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId); protected virtual Task SendMessageToUserIdWithContent(string userId, int messageTag, object? content) - => SendMessageToUserIdInternal(userId, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null); - - protected virtual Task SendMessageToUserIdInternal(string userId, int messageTag, ISignalRMessage message, int? requestId) - => SendMessageToClient(Clients.User(userId), messageTag, message, requestId); + => SendMessageToClient(Clients.User(userId), messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToConnectionIdWithContent(string connectionId, int messageTag, object? content) - => SendMessageToConnectionIdInternal(connectionId, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null); - - protected virtual Task SendMessageToConnectionIdInternal(string connectionId, int messageTag, ISignalRMessage message, int? requestId) - => SendMessageToClient(Clients.Client(connectionId), messageTag, message, requestId); + => SendMessageToClient(Clients.Client(connectionId), messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToOthers(int messageTag, object? content) - => SendMessageToClient(Clients.Others, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null); + => SendMessageToClient(Clients.Others, messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToAll(int messageTag, object? content) - => SendMessageToClient(Clients.All, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null); - + => SendMessageToClient(Clients.All, messageTag, SignalResponseStatus.Success, content); + /// - /// Sends message to client using Binary serialization. + /// Sends message to client. Protocol serializes responseData directly to pipe (zero-copy write). /// - protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, ISignalRMessage message, int? requestId = null) + protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, + SignalResponseStatus status, object? responseData, int? requestId = null) { - var responseMessage = (SignalResponseDataMessage)message; var signalParams = new SignalParams { - Status = responseMessage.Status, - DataSerializerType = responseMessage.DataSerializerType + Status = status, + DataSerializerType = SerializerOptions.SerializerType, + SignalDataType = responseData?.GetType().AssemblyQualifiedName }; - var responseData = responseMessage.ResponseData ?? new SignalData([]); var tagName = ConstHelper.NameByValue(messageTag); - Logger.Debug($"[{responseData.Length / 1024}kb] Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}"); + Logger.Debug($"Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}"); - await sendTo.OnReceiveMessage(messageTag, requestId, signalParams, responseData); + await sendTo.OnReceiveMessage(messageTag, requestId, signalParams, responseData!); Logger.Debug($"Server sent message to client; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}"); } diff --git a/AyCode.Services.Tests/SignalRs/PostJsonDataMessageTests.cs b/AyCode.Services.Tests/SignalRs/PostJsonDataMessageTests.cs index b2cb617..fb3c746 100644 --- a/AyCode.Services.Tests/SignalRs/PostJsonDataMessageTests.cs +++ b/AyCode.Services.Tests/SignalRs/PostJsonDataMessageTests.cs @@ -42,15 +42,14 @@ public class PostJsonDataMessageTests Assert.IsNotNull(serverParams); Assert.AreEqual(testValue, serverParams![0]); - // Response round-trip (SignalResponseDataMessage is in-memory DTO, not serialized as envelope on wire) + // Response round-trip — RawResponseData holds the typed object directly (no intermediate byte[]) var serviceResult = $"{serverParams[0]}"; - var responseData = SignalRSerializationHelper.CreateResponseData(serviceResult, AyCode.Core.Serializers.Binaries.AcBinarySerializerOptions.Default); var clientResponse = new SignalResponseDataMessage { MessageTag = 100, Status = SignalResponseStatus.Success, DataSerializerType = AyCode.Core.Serializers.AcSerializerType.Binary, - ResponseData = responseData != null ? new SignalData(responseData) : null + RawResponseData = serviceResult }; var finalResult = clientResponse.GetResponseData(); Assert.AreEqual(testValue.ToString(), finalResult); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 0d5944c..b88474f 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -2,6 +2,7 @@ using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Text; using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.Connections; @@ -24,9 +25,11 @@ namespace AyCode.Services.SignalRs; /// Arguments are serialized individually with an INT32 length prefix each, /// enabling deferred deserialization via IHubProtocol's binder pattern. /// -/// All writes go through BufferWriterBinaryOutput for zero virtual dispatch -/// on the hot path. Argument payloads are serialized directly to the pipe -/// via AcBinarySerializer (zero-copy). Length prefixes are patched in-place. +/// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path. +/// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write). +/// +/// Read path: SequenceReader<byte> reads directly from the pipe's ReadOnlySequence. +/// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read). /// public class AcBinaryHubProtocol : IHubProtocol { @@ -45,6 +48,13 @@ public class AcBinaryHubProtocol : IHubProtocol protected volatile AcBinarySerializerOptions _options; + /// + /// Parsed SignalParams from current message (arg[2]). + /// Used by ReadSingleArgument (arg[3]) for type-aware deserialization. + /// Thread-safe: SignalR processes messages sequentially per connection. + /// + private SignalParams? _currentSignalParams; + public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AcBinaryHubProtocol(AcBinarySerializerOptions options) @@ -202,74 +212,41 @@ public class AcBinaryHubProtocol : IHubProtocol { message = null; - if (input.Length < LengthPrefixSize) + var reader = new SequenceReader(input); + if (!reader.TryReadLittleEndian(out int payloadLength)) return false; - int payloadLength; - if (input.FirstSpan.Length >= LengthPrefixSize) - { - payloadLength = Unsafe.ReadUnaligned(ref Unsafe.AsRef(in input.FirstSpan[0])); - } - else - { - Span lenBuf = stackalloc byte[LengthPrefixSize]; - input.Slice(0, LengthPrefixSize).CopyTo(lenBuf); - payloadLength = Unsafe.ReadUnaligned(ref lenBuf[0]); - } - - var totalLength = LengthPrefixSize + payloadLength; - if (input.Length < totalLength) + if (reader.Remaining < payloadLength) return false; - var payload = input.Slice(LengthPrefixSize, payloadLength); + _currentSignalParams = null; + message = ParseMessage(ref reader, payloadLength, binder); - ReadOnlySpan span; - byte[]? rentedBuffer = null; - - if (payload.IsSingleSegment) - { - span = payload.FirstSpan; - } - else - { - rentedBuffer = ArrayPool.Shared.Rent(payloadLength); - payload.CopyTo(rentedBuffer); - span = rentedBuffer.AsSpan(0, payloadLength); - } - - try - { - message = ParseMessage(span, binder); - } - finally - { - if (rentedBuffer != null) - ArrayPool.Shared.Return(rentedBuffer); - } - - input = input.Slice(totalLength); + input = input.Slice(LengthPrefixSize + payloadLength); return message != null; } - private HubMessage? ParseMessage(ReadOnlySpan span, IInvocationBinder binder) + private HubMessage? ParseMessage(ref SequenceReader r, int payloadLength, IInvocationBinder binder) { - if (span.Length == 0) + if (payloadLength == 0) return null; - var reader = new SpanReader(span); - var msgType = reader.ReadByte(); + // Mark end position so Parse* methods can check Remaining relative to payload + var payloadEnd = r.Consumed + payloadLength; + + r.TryRead(out byte msgType); return msgType switch { - MsgInvocation => ParseInvocation(ref reader, binder), - MsgStreamInvocation => ParseStreamInvocation(ref reader, binder), - MsgStreamItem => ParseStreamItem(ref reader, binder), - MsgCompletion => ParseCompletion(ref reader, binder), - MsgCancelInvocation => ParseCancelInvocation(ref reader), + MsgInvocation => ParseInvocation(ref r, binder), + MsgStreamInvocation => ParseStreamInvocation(ref r, binder), + MsgStreamItem => ParseStreamItem(ref r, binder), + MsgCompletion => ParseCompletion(ref r, binder), + MsgCancelInvocation => ParseCancelInvocation(ref r), MsgPing => PingMessage.Instance, - MsgClose => ParseClose(ref reader), - MsgAck => new AckMessage(reader.ReadInt64()), - MsgSequence => new SequenceMessage(reader.ReadInt64()), + MsgClose => ParseClose(ref r), + MsgAck => new AckMessage(ReadInt64(ref r)), + MsgSequence => new SequenceMessage(ReadInt64(ref r)), _ => null }; } @@ -284,7 +261,7 @@ public class AcBinaryHubProtocol : IHubProtocol private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message); [Conditional("DEBUG")] - private static void LogParseInvocation(string target, IReadOnlyList paramTypes, int remaining) + private static void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) { if (DiagnosticLogger == null) return; var typeNames = new string[paramTypes.Count]; @@ -292,16 +269,16 @@ public class AcBinaryHubProtocol : IHubProtocol DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}"); } - private HubMessage ParseInvocation(ref SpanReader r, IInvocationBinder binder) + private HubMessage ParseInvocation(ref SequenceReader r, IInvocationBinder binder) { - var invocationId = r.ReadNullableString(); - var target = r.ReadString(); + var invocationId = ReadNullableString(ref r); + var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); LogParseInvocation(target, paramTypes, r.Remaining); var args = ReadArguments(ref r, paramTypes); - var streamIds = r.ReadStringArray(); + var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = streamIds is { Length: > 0 } @@ -314,13 +291,13 @@ public class AcBinaryHubProtocol : IHubProtocol return msg; } - private HubMessage ParseStreamInvocation(ref SpanReader r, IInvocationBinder binder) + private HubMessage ParseStreamInvocation(ref SequenceReader r, IInvocationBinder binder) { - var invocationId = r.ReadString(); - var target = r.ReadString(); + var invocationId = ReadString(ref r); + var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); var args = ReadArguments(ref r, paramTypes); - var streamIds = r.ReadStringArray(); + var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = new StreamInvocationMessage(invocationId, target, args, streamIds); @@ -330,9 +307,9 @@ public class AcBinaryHubProtocol : IHubProtocol return msg; } - private HubMessage ParseStreamItem(ref SpanReader r, IInvocationBinder binder) + private HubMessage ParseStreamItem(ref SequenceReader r, IInvocationBinder binder) { - var invocationId = r.ReadString(); + var invocationId = ReadString(ref r); var itemType = binder.GetStreamItemType(invocationId); var item = ReadSingleArgument(ref r, itemType); var headers = ReadHeaders(ref r); @@ -344,11 +321,12 @@ public class AcBinaryHubProtocol : IHubProtocol return msg; } - private HubMessage ParseCompletion(ref SpanReader r, IInvocationBinder binder) + private HubMessage ParseCompletion(ref SequenceReader r, IInvocationBinder binder) { - var invocationId = r.ReadString(); - var error = r.ReadNullableString(); - var hasResult = r.ReadByte() == 1; + var invocationId = ReadString(ref r); + var error = ReadNullableString(ref r); + r.TryRead(out byte hasResultByte); + var hasResult = hasResultByte == 1; object? result = null; if (hasResult) @@ -373,9 +351,9 @@ public class AcBinaryHubProtocol : IHubProtocol return msg; } - private static HubMessage ParseCancelInvocation(ref SpanReader r) + private static HubMessage ParseCancelInvocation(ref SequenceReader r) { - var invocationId = r.ReadString(); + var invocationId = ReadString(ref r); var headers = ReadHeaders(ref r); var msg = new CancelInvocationMessage(invocationId); @@ -385,10 +363,11 @@ public class AcBinaryHubProtocol : IHubProtocol return msg; } - private static HubMessage ParseClose(ref SpanReader r) + private static HubMessage ParseClose(ref SequenceReader r) { - var error = r.ReadNullableString(); - var allowReconnect = r.Remaining > 0 && r.ReadByte() == 1; + var error = ReadNullableString(ref r); + r.TryRead(out byte reconnectByte); + var allowReconnect = reconnectByte == 1; return new CloseMessage(error, allowReconnect); } @@ -416,18 +395,6 @@ public class AcBinaryHubProtocol : IHubProtocol return; } - if (value is SignalData signalData) - { - // SignalData fast-path: same wire format as byte[], reads from Span - var span = signalData.Span; - var argPayload = 1 + VarUIntSize((uint)span.Length) + span.Length; - bw.WriteRaw(argPayload); - bw.WriteByte(BinaryTypeCode.ByteArray); - bw.WriteVarUInt((uint)span.Length); - bw.WriteBytes(span); - return; - } - // Flush BWO to pipe, then serialize directly to the pipe via AcBinarySerializer bw.FlushAndReset(); @@ -441,9 +408,9 @@ public class AcBinaryHubProtocol : IHubProtocol externalBytes += LengthPrefixSize + argBytes; } - private object?[] ReadArguments(ref SpanReader r, IReadOnlyList paramTypes) + private object?[] ReadArguments(ref SequenceReader r, IReadOnlyList paramTypes) { - var count = (int)r.ReadVarUInt(); + var count = (int)ReadVarUInt(ref r); LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}"); @@ -456,54 +423,65 @@ public class AcBinaryHubProtocol : IHubProtocol LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}"); args[i] = ReadSingleArgument(ref r, targetType); + + // Capture parsed SignalParams for type-aware deserialization of subsequent args + if (args[i] is SignalParams sp) + _currentSignalParams = sp; } return args; } - private object? ReadSingleArgument(ref SpanReader r, Type targetType) + /// + /// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer. + /// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[]. + /// SignalDataType enables eager deserialization of response data to the server's actual type. + /// + private object? ReadSingleArgument(ref SequenceReader r, Type targetType) { - var argLength = r.ReadInt32(); + r.TryReadLittleEndian(out int argLength); if (argLength == 0) return null; - var argSpan = r.ReadSpan(argLength); - - if (argLength == 1 && argSpan[0] == 0) - return null; - - // byte[] fast-path: bypass deserializer engine. - // Check wire format only — ByteArray marker (0x44) is unambiguous: - // no AcBinary-serialized object starts with it (they start with version=1). - // Removing the targetType check makes the protocol robust against - // client/server argument order mismatches for byte[] arguments. - if (argSpan.Length > 0 && argSpan[0] == BinaryTypeCode.ByteArray) + // Null marker check + if (argLength == 1) { - var byteReader = new SpanReader(argSpan.Slice(1)); - var len = (int)byteReader.ReadVarUInt(); - var payloadSpan = byteReader.ReadSpan(len); - // Skip virtual dispatch for plain byte[] (most common case — SignalParams.Parameters). - // Only call virtual hook when targetType is not byte[] (e.g. SignalData). - return targetType == typeof(byte[]) || targetType == typeof(object) - ? payloadSpan.ToArray() - : CreateByteArrayResult(payloadSpan, targetType); + r.TryPeek(out byte marker); + if (marker == 0) { r.Advance(1); return null; } } - return AcBinaryDeserializer.Deserialize(argSpan, targetType, _options); + // Slice argument from pipe sequence — zero-copy reference + var argSlice = r.UnreadSequence.Slice(0, argLength); + r.Advance(argLength); + + // Response data: resolve actual type from SignalDataType for eager deserialization + if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null) + { + var dataType = Type.GetType(_currentSignalParams.SignalDataType); + if (dataType != null) + targetType = dataType; + } + + return DeserializeFromSequence(argSlice, targetType, _options); } /// - /// Hook for derived protocols to customize byte[] argument creation. - /// Called from the byte[] fast-path (ByteArray wire marker 0x44). - /// Base implementation: allocates new byte[] via .ToArray(). - /// Override to use ArrayPool, return SignalData, etc. + /// Deserializes from a ReadOnlySequence, using the pipe's backing byte[] when possible (zero-copy). + /// Only copies for rare multi-segment arguments that span pipe buffer boundaries. /// - protected virtual object CreateByteArrayResult(ReadOnlySpan data, Type targetType) - => data.ToArray(); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static object? DeserializeFromSequence(ReadOnlySequence data, Type targetType, AcBinarySerializerOptions options) + { + if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)) + return AcBinaryDeserializer.Deserialize(seg.Array!, seg.Offset, (int)data.Length, targetType, options); + + var bytes = data.ToArray(); + return AcBinaryDeserializer.Deserialize(bytes, 0, bytes.Length, targetType, options); + } #endregion - #region Framing Helpers + #region Write Framing Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) @@ -556,6 +534,82 @@ public class AcBinaryHubProtocol : IHubProtocol #endregion + #region Sequence Read Helpers + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static long ReadInt64(ref SequenceReader r) + { + r.TryReadLittleEndian(out long v); + return v; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint ReadVarUInt(ref SequenceReader r) + { + uint value = 0; + var shift = 0; + while (r.TryRead(out byte b)) + { + value |= (uint)(b & 0x7F) << shift; + if ((b & 0x80) == 0) + return value; + shift += 7; + } + return value; + } + + private static string ReadString(ref SequenceReader r) + { + var byteCount = (int)ReadVarUInt(ref r); + if (byteCount == 0) + return string.Empty; + + r.TryReadExact(byteCount, out var bytes); + return bytes.IsSingleSegment + ? Encoding.UTF8.GetString(bytes.FirstSpan) + : Encoding.UTF8.GetString(bytes.ToArray()); + } + + private static string? ReadNullableString(ref SequenceReader r) + { + r.TryRead(out byte marker); + return marker == 0 ? null : ReadString(ref r); + } + + private static string[]? ReadStringArray(ref SequenceReader r) + { + var count = (int)ReadVarUInt(ref r); + if (count == 0) + return null; + + var array = new string[count]; + for (var i = 0; i < count; i++) + array[i] = ReadString(ref r); + return array; + } + + private static Dictionary? ReadHeaders(ref SequenceReader r) + { + if (r.Remaining == 0) + return null; + + var count = (int)ReadVarUInt(ref r); + if (count == 0) + return null; + + var headers = new Dictionary(count, StringComparer.Ordinal); + for (var i = 0; i < count; i++) + { + var key = ReadString(ref r); + var value = ReadString(ref r); + headers[key] = value; + } + + return headers; + } + + #endregion + #region Helpers private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId) @@ -571,120 +625,5 @@ public class AcBinaryHubProtocol : IHubProtocol invMsg.Headers = headers; } - private static Dictionary? ReadHeaders(ref SpanReader r) - { - if (r.Remaining == 0) - return null; - - var count = (int)r.ReadVarUInt(); - if (count == 0) - return null; - - var headers = new Dictionary(count, StringComparer.Ordinal); - for (var i = 0; i < count; i++) - { - var key = r.ReadString(); - var value = r.ReadString(); - headers[key] = value; - } - - return headers; - } - - #endregion - - #region SpanReader - - /// - /// Lightweight ref struct for sequential reading from a ReadOnlySpan. - /// - private ref struct SpanReader - { - private readonly ReadOnlySpan _span; - private int _pos; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public SpanReader(ReadOnlySpan span) - { - _span = span; - _pos = 0; - } - - public int Remaining - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => _span.Length - _pos; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public byte ReadByte() => _span[_pos++]; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public int ReadInt32() - { - var value = Unsafe.ReadUnaligned(ref Unsafe.AsRef(in _span[_pos])); - _pos += 4; - return value; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public long ReadInt64() - { - var value = Unsafe.ReadUnaligned(ref Unsafe.AsRef(in _span[_pos])); - _pos += 8; - return value; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public uint ReadVarUInt() - { - uint value = 0; - var shift = 0; - while (true) - { - var b = _span[_pos++]; - value |= (uint)(b & 0x7F) << shift; - if ((b & 0x80) == 0) - return value; - shift += 7; - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ReadOnlySpan ReadSpan(int length) - { - var result = _span.Slice(_pos, length); - _pos += length; - return result; - } - - public string ReadString() - { - var byteCount = (int)ReadVarUInt(); - if (byteCount == 0) - return string.Empty; - var bytes = ReadSpan(byteCount); - return Encoding.UTF8.GetString(bytes); - } - - public string? ReadNullableString() - { - var marker = ReadByte(); - return marker == 0 ? null : ReadString(); - } - - public string[]? ReadStringArray() - { - var count = (int)ReadVarUInt(); - if (count == 0) - return null; - - var array = new string[count]; - for (var i = 0; i < count; i++) - array[i] = ReadString(); - return array; - } - } - #endregion } diff --git a/AyCode.Services/SignalRs/AcSignalRClientBase.cs b/AyCode.Services/SignalRs/AcSignalRClientBase.cs index 180f149..866b347 100644 --- a/AyCode.Services/SignalRs/AcSignalRClientBase.cs +++ b/AyCode.Services/SignalRs/AcSignalRClientBase.cs @@ -21,7 +21,7 @@ namespace AyCode.Services.SignalRs protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; - protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, SignalData data); + protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, object data); public int MsDelay = 25; public int MsFirstDelay = 50; @@ -70,7 +70,7 @@ namespace AyCode.Services.SignalRs HubConnection = hubBuilder.Build(); HubConnection.Closed += HubConnection_Closed; - _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); + _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); } protected AcSignalRClientBase(AcLoggerBase logger) @@ -105,8 +105,8 @@ namespace AyCode.Services.SignalRs protected virtual ValueTask DisposeConnectionInternal() => HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask; - protected virtual Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, byte[]? messageBytes) - => HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, requestId, signalParams, messageBytes) ?? Task.CompletedTask; + protected virtual Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, object? data) + => HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, requestId, signalParams, data) ?? Task.CompletedTask; #endregion @@ -416,29 +416,24 @@ namespace AyCode.Services.SignalRs protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32; - public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, SignalData data) + public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"; - if (data.IsEmpty) Logger.Warning($"data.IsEmpty! {logText}"); - try { if (requestId.HasValue && _responseByRequestId.TryGetValue(requestId.Value, out var requestModel)) { var reqId = requestId.Value; requestModel.ResponseDateTime = DateTime.UtcNow; - Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{data.Length / 1024}kb]{logText}"); + Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms]{logText}"); - // Diagnostic logging for binary deserialization debugging - LogBinaryDiagnostics(messageTag, data); - - // No envelope deserialization — construct directly from params + data + // Protocol already deserialized data to typed object or byte[] var responseMessage = new SignalResponseDataMessage { Status = signalParams.Status, DataSerializerType = signalParams.DataSerializerType, - ResponseData = data + RawResponseData = data }; switch (requestModel.ResponseByRequestId) @@ -474,12 +469,6 @@ namespace AyCode.Services.SignalRs } catch (Exception ex) { - // Enhanced error logging with binary diagnostics - if (!data.IsEmpty) - { - LogBinaryDiagnosticsOnError(messageTag, data, ex); - } - if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel)) SignalRRequestModelPool.Return(exModel); @@ -490,105 +479,5 @@ namespace AyCode.Services.SignalRs return Task.CompletedTask; } - /// - /// Enable diagnostic logging for binary deserialization debugging. - /// Set to true to log hex dumps of received binary data. - /// - public bool EnableBinaryDiagnostics { get; set; } = false; - - /// - /// Logs binary diagnostics for debugging serialization issues. - /// - private void LogBinaryDiagnostics(int messageTag, SignalData data) - { - if (!EnableBinaryDiagnostics || data.IsEmpty) return; - - try - { - var span = data.Span; - var hexDump = Convert.ToHexString(span[..Math.Min(500, span.Length)]); - Logger.Info($"=== BINARY DIAGNOSTICS === Tag: {messageTag}; Length: {data.Length}"); - Logger.Info($"HEX (first 500 bytes): {hexDump}"); - - // Parse header info - if (span.Length >= 3) - { - var version = span[0]; - var marker = span[1]; - Logger.Info($"Version: {version}; Marker: 0x{marker:X2}"); - - if ((marker & 0x10) != 0 && span.Length > 2) - { - var propCount = span[2]; - Logger.Info($"Header property count: {propCount}"); - - // Parse first 10 property names - var pos = 3; - for (int i = 0; i < Math.Min((int)propCount, 10) && pos < span.Length; i++) - { - var strLen = span[pos++]; - if (pos + strLen <= span.Length) - { - var propName = System.Text.Encoding.UTF8.GetString(span.Slice(pos, strLen)); - pos += strLen; - Logger.Info($" [{i}]: '{propName}'"); - } - } - } - } - } - catch (Exception ex) - { - Logger.Warning($"Failed to log binary diagnostics: {ex.Message}"); - } - } - - /// - /// Logs binary diagnostics when an error occurs during deserialization. - /// - private void LogBinaryDiagnosticsOnError(int messageTag, SignalData data, Exception error) - { - try - { - var span = data.Span; - Logger.Error($"=== BINARY DESERIALIZATION ERROR ==="); - Logger.Error($"Tag: {messageTag}; Length: {data.Length}"); - Logger.Error($"Error: {error.Message}"); - - var hexDump = Convert.ToHexString(span[..Math.Min(1000, span.Length)]); - Logger.Error($"HEX (first 1000 bytes): {hexDump}"); - - // Parse header info - if (span.Length >= 3) - { - var version = span[0]; - var marker = span[1]; - Logger.Error($"Version: {version}; Marker: 0x{marker:X2}"); - - if ((marker & 0x10) != 0 && span.Length > 2) - { - var propCount = span[2]; - Logger.Error($"Header property count: {propCount}"); - - // Parse ALL property names - var pos = 3; - for (int i = 0; i < propCount && pos < span.Length; i++) - { - var strLen = span[pos++]; - if (pos + strLen <= span.Length) - { - var propName = System.Text.Encoding.UTF8.GetString(span.Slice(pos, strLen)); - pos += strLen; - Logger.Error($" Header[{i}]: '{propName}'"); - } - } - } - } - } - catch (Exception ex) - { - Logger.Warning($"Failed to log binary diagnostics on error: {ex.Message}"); - } - } } } diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index a49a9ce..d86ca69 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -1,27 +1,13 @@ -using System.Buffers; using AyCode.Core.Serializers.Binaries; namespace AyCode.Services.SignalRs; /// -/// Project-specific binary protocol. Uses ArrayPool for byte[] arguments -/// when the target type is SignalData (client receive path optimization). +/// Project-specific binary protocol. /// Register this in PluginNopStartup.cs and AcSignalRClientBase instead of AcBinaryHubProtocol. /// public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol { public AyCodeBinaryHubProtocol() { } public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options) : base(options) { } - - protected override object CreateByteArrayResult(ReadOnlySpan data, Type targetType) - { - if (targetType == typeof(SignalData)) - { - var rented = ArrayPool.Shared.Rent(data.Length); - data.CopyTo(rented); - return new SignalData(rented, data.Length, isRented: true); - } - - return base.CreateByteArrayResult(data, targetType); - } } diff --git a/AyCode.Services/SignalRs/IAcSignalRHubBase.cs b/AyCode.Services/SignalRs/IAcSignalRHubBase.cs index d2a5302..4bb3c2c 100644 --- a/AyCode.Services/SignalRs/IAcSignalRHubBase.cs +++ b/AyCode.Services/SignalRs/IAcSignalRHubBase.cs @@ -3,5 +3,5 @@ public interface IAcSignalRHubBase { //Task OnRequestMessage(int messageTag, int requestId); - Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, SignalData data); + Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data); } \ No newline at end of file diff --git a/AyCode.Services/SignalRs/IAcSignalRHubClient.cs b/AyCode.Services/SignalRs/IAcSignalRHubClient.cs index e4c2450..7619acf 100644 --- a/AyCode.Services/SignalRs/IAcSignalRHubClient.cs +++ b/AyCode.Services/SignalRs/IAcSignalRHubClient.cs @@ -1,12 +1,8 @@ using AyCode.Core.Extensions; using AyCode.Core.Interfaces; -using System.Buffers; -using System.Runtime.CompilerServices; -using AyCode.Core.Serializers.Binaries; -using AyCode.Core.Serializers.Jsons; +using AyCode.Core.Serializers; using JsonIgnoreAttribute = Newtonsoft.Json.JsonIgnoreAttribute; using STJIgnore = System.Text.Json.Serialization.JsonIgnoreAttribute; -using AyCode.Core.Serializers; namespace AyCode.Services.SignalRs; @@ -146,23 +142,23 @@ public enum SignalResponseStatus : byte /// JSON mode uses GZip compression for reduced payload size. /// Optimized: uses pooled buffers for decompression, zero-copy deserialization path. /// -public sealed class SignalResponseDataMessage : ISignalResponseMessage, IDisposable +/// +/// Lightweight response container for client request-response pipeline and stream wire format. +/// Main send path does NOT use this — server sends (signalParams + object) directly. +/// Used by: (1) client OnReceiveMessage → stores in requestModel, (2) stream path (serialized as blob). +/// +public sealed class SignalResponseDataMessage : ISignalResponseMessage { public int MessageTag { get; set; } public SignalResponseStatus Status { get; set; } public AcSerializerType DataSerializerType { get; set; } - public SignalData? ResponseData { get; set; } - - [JsonIgnore] [STJIgnore] private object? _cachedResponseData; - [JsonIgnore] [STJIgnore] private byte[]? _rentedDecompressedBuffer; - [JsonIgnore] [STJIgnore] private int _decompressedLength; /// - /// Enable diagnostic logging for ResponseData deserialization. - /// When set, logs hex dump and header info before deserialization. + /// Raw response object — on client: protocol-deserialized typed object or byte[]. + /// On server (stream path only): raw object for blob serialization. /// - [JsonIgnore] [STJIgnore] - public static Action? DiagnosticLogger { get; set; } + [JsonIgnore] [STJIgnore] + public object? RawResponseData { get; set; } public SignalResponseDataMessage() { } @@ -172,277 +168,23 @@ public sealed class SignalResponseDataMessage : ISignalResponseMessage, IDisposa Status = status; } + /// Stream path constructor: stores raw object for blob serialization. public SignalResponseDataMessage(int messageTag, SignalResponseStatus status, object? responseData, AcSerializerOptions serializerOptions) : this(messageTag, status) { DataSerializerType = serializerOptions.SerializerType; - var bytes = SignalRSerializationHelper.CreateResponseData(responseData, serializerOptions); - ResponseData = bytes != null ? new SignalData(bytes) : null; + RawResponseData = responseData; } /// - /// Deserializes the ResponseData to the specified type. - /// Uses cached result for repeated calls. + /// Extracts response data as T. + /// Protocol eagerly deserializes via SignalDataType → RawResponseData is typed object → direct cast. + /// Consumer's responsibility to handle byte[] if T is byte[]. /// public T? GetResponseData() { - if (_cachedResponseData != null) return (T)_cachedResponseData; - if (ResponseData == null || ResponseData.IsEmpty) return default; - - try - { - if (DataSerializerType == AcSerializerType.Binary) - { - // Log diagnostics if enabled - LogResponseDataDiagnostics(); - - return (T)(_cachedResponseData = AcBinaryDeserializer.Deserialize(ResponseData.Span)!); - } - - // Decompress GZip to pooled buffer and deserialize directly - EnsureDecompressed(); - - var result = AcJsonDeserializer.Deserialize(new ReadOnlySpan(_rentedDecompressedBuffer, 0, _decompressedLength)); - _cachedResponseData = result; - return result; - } - catch (Exception ex) - { - // Log detailed error diagnostics - LogResponseDataError(ex); - throw; - } - } - - private void LogResponseDataDiagnostics() - { - if (DiagnosticLogger == null || ResponseData == null) return; - - try - { - var targetType = typeof(T); - DiagnosticLogger($"=== RESPONSE DATA DIAGNOSTICS (DESERIALIZE) ==="); - DiagnosticLogger($"Target Type: {targetType.Name}"); - DiagnosticLogger($"Target FullName: {targetType.FullName}"); - DiagnosticLogger($"Target Namespace: {targetType.Namespace}"); - DiagnosticLogger($"Target Assembly: {targetType.Assembly.GetName().Name} v{targetType.Assembly.GetName().Version}"); - DiagnosticLogger($"Target AssemblyQualifiedName: {targetType.AssemblyQualifiedName}"); - DiagnosticLogger($"Target Assembly Location: {targetType.Assembly.Location}"); - - // Log element type for collections - if (targetType.IsGenericType) - { - var genericArgs = targetType.GetGenericArguments(); - DiagnosticLogger($"Generic Arguments: [{string.Join(", ", genericArgs.Select(t => t.FullName))}]"); - if (genericArgs.Length == 1) - { - var elementType = genericArgs[0]; - DiagnosticLogger($"--- ELEMENT TYPE INFO ---"); - DiagnosticLogger($"Element Type: {elementType.Name}"); - DiagnosticLogger($"Element FullName: {elementType.FullName}"); - DiagnosticLogger($"Element Namespace: {elementType.Namespace}"); - DiagnosticLogger($"Element Assembly: {elementType.Assembly.GetName().Name} v{elementType.Assembly.GetName().Version}"); - DiagnosticLogger($"Element AssemblyQualifiedName: {elementType.AssemblyQualifiedName}"); - DiagnosticLogger($"Element Assembly Location: {elementType.Assembly.Location}"); - DiagnosticLogger($"Element BaseType: {elementType.BaseType?.FullName ?? "null"}"); - - // Log inheritance chain - var baseType = elementType.BaseType; - var inheritanceChain = new List(); - while (baseType != null && baseType != typeof(object)) - { - inheritanceChain.Add($"{baseType.Name} ({baseType.Assembly.GetName().Name})"); - baseType = baseType.BaseType; - } - if (inheritanceChain.Count > 0) - { - DiagnosticLogger($"Element Inheritance: {string.Join(" -> ", inheritanceChain)}"); - } - - LogTypeProperties(elementType, "Element"); - } - } - else - { - DiagnosticLogger($"BaseType: {targetType.BaseType?.FullName ?? "null"}"); - LogTypeProperties(targetType, "Target"); - } - - DiagnosticLogger($"ResponseData.Length: {ResponseData.Length}"); - DiagnosticLogger($"HEX (first 500 bytes): {Convert.ToHexString(ResponseData.Span[..Math.Min(500, ResponseData.Length)])}"); - - // Parse header with VarInt support - LogBinaryHeader(ResponseData.Span); - } - catch (Exception ex) - { - DiagnosticLogger($"Failed to log diagnostics: {ex.Message}"); - } - } - - private static void LogTypeProperties(Type type, string prefix) - { - if (DiagnosticLogger == null) return; - - var props = type.GetProperties(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance) - .Where(p => p.CanRead && p.GetIndexParameters().Length == 0) - .ToArray(); - - // Log in declaration order (not alphabetically) to match serialization order - DiagnosticLogger($"{prefix} Property Count: {props.Length}"); - for (var i = 0; i < props.Length; i++) - { - var p = props[i]; - var declaringType = p.DeclaringType?.Name ?? "?"; - var declaringAssembly = p.DeclaringType?.Assembly.GetName().Name ?? "?"; - DiagnosticLogger($" {prefix}[{i}]: {p.Name} : {p.PropertyType.Name} (declared in {declaringType} @ {declaringAssembly})"); - } - } - - private static void LogBinaryHeader(ReadOnlySpan data) - { - if (DiagnosticLogger == null || data.Length < 3) return; - - var version = data[0]; - var marker = data[1]; - DiagnosticLogger($"Binary Version: {version}; Marker: 0x{marker:X2}"); - - // Check if metadata flag is set - if ((marker & 0x10) == 0) - { - DiagnosticLogger("Header: No metadata (property names inline)"); - return; - } - - // Read property count as VarUInt - var pos = 2; - var (propCount, bytesRead) = ReadVarUIntFromSpan(data[pos..]); - pos += bytesRead; - - DiagnosticLogger($"Header Property Count: {propCount}"); - - for (var i = 0; i < (int)propCount && pos < data.Length; i++) - { - // Read string length as VarUInt - var (strLen, strLenBytes) = ReadVarUIntFromSpan(data[pos..]); - pos += strLenBytes; - - if (pos + (int)strLen <= data.Length) - { - var propName = System.Text.Encoding.UTF8.GetString(data.Slice(pos, (int)strLen)); - pos += (int)strLen; - DiagnosticLogger($" Header[{i}]: '{propName}'"); - } - else - { - DiagnosticLogger($" Header[{i}]: "); - break; - } - } - } - - private static (uint value, int bytesRead) ReadVarUIntFromSpan(ReadOnlySpan span) - { - uint value = 0; - var shift = 0; - var bytesRead = 0; - - while (bytesRead < span.Length) - { - var b = span[bytesRead++]; - value |= (uint)(b & 0x7F) << shift; - if ((b & 0x80) == 0) - break; - shift += 7; - if (shift > 35) - break; - } - - return (value, bytesRead); - } - - private void LogResponseDataError(Exception error) - { - if (DiagnosticLogger == null || ResponseData == null) return; - - try - { - var targetType = typeof(T); - DiagnosticLogger($"=== RESPONSE DATA DESERIALIZATION ERROR ==="); - DiagnosticLogger($"Error: {error.Message}"); - DiagnosticLogger($"Target Type: {targetType.Name}"); - DiagnosticLogger($"Target FullName: {targetType.FullName}"); - DiagnosticLogger($"Target Namespace: {targetType.Namespace}"); - DiagnosticLogger($"Target Assembly: {targetType.Assembly.GetName().Name} v{targetType.Assembly.GetName().Version}"); - DiagnosticLogger($"Target AssemblyQualifiedName: {targetType.AssemblyQualifiedName}"); - - // Log element type for collections - if (targetType.IsGenericType) - { - var genericArgs = targetType.GetGenericArguments(); - DiagnosticLogger($"Generic Arguments: [{string.Join(", ", genericArgs.Select(t => t.FullName))}]"); - if (genericArgs.Length == 1) - { - var elementType = genericArgs[0]; - DiagnosticLogger($"Element Type: {elementType.FullName}"); - DiagnosticLogger($"Element Assembly: {elementType.Assembly.GetName().Name}"); - LogTypeProperties(elementType, "Element"); - } - } - else - { - LogTypeProperties(targetType, "Target"); - } - - DiagnosticLogger($"ResponseData.Length: {ResponseData.Length}"); - DiagnosticLogger($"HEX (first 1000 bytes): {Convert.ToHexString(ResponseData.Span[..Math.Min(1000, ResponseData.Length)])}"); - - // Parse header - LogBinaryHeader(ResponseData.Span); - - // Log inner exception if present - if (error.InnerException != null) - { - DiagnosticLogger($"Inner Exception: {error.InnerException.Message}"); - } - - // Log stack trace - DiagnosticLogger($"Stack Trace: {error.StackTrace}"); - } - catch (Exception ex) - { - DiagnosticLogger?.Invoke($"Failed to log error diagnostics: {ex.Message}"); - } - } - - /// - /// Gets the decompressed JSON bytes as a ReadOnlySpan for direct processing. - /// - public ReadOnlySpan GetDecompressedJsonSpan() - { - if (ResponseData == null || ResponseData.IsEmpty) return ReadOnlySpan.Empty; - if (DataSerializerType == AcSerializerType.Binary) return ReadOnlySpan.Empty; - - EnsureDecompressed(); - return _rentedDecompressedBuffer.AsSpan(0, _decompressedLength); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void EnsureDecompressed() - { - if (_rentedDecompressedBuffer != null) return; - - (_rentedDecompressedBuffer, _decompressedLength) = AyCode.Core.Compression.GzipHelper.DecompressToRentedBuffer(ResponseData!.Span); - } - - public void Dispose() - { - ResponseData?.Dispose(); - if (_rentedDecompressedBuffer != null) - { - ArrayPool.Shared.Return(_rentedDecompressedBuffer, clearArray: true); - _rentedDecompressedBuffer = null; - } + if (RawResponseData is T typed) return typed; + return default; } } diff --git a/AyCode.Services/SignalRs/ISignalParams.cs b/AyCode.Services/SignalRs/ISignalParams.cs index 6455e0d..710e968 100644 --- a/AyCode.Services/SignalRs/ISignalParams.cs +++ b/AyCode.Services/SignalRs/ISignalParams.cs @@ -34,6 +34,13 @@ public class SignalParams : ISignalParams /// public byte[]? Parameters { get; set; } + /// + /// AssemblyQualifiedName of the response data type. + /// Set by server before sending. Protocol uses this to deserialize directly to the target type. + /// null = raw byte[] (populate/merge path). + /// + public string? SignalDataType { get; set; } + /// /// Cached deserialized byte[][] from Parameters. /// diff --git a/AyCode.Services/SignalRs/SignalData.cs b/AyCode.Services/SignalRs/SignalData.cs deleted file mode 100644 index b358e83..0000000 --- a/AyCode.Services/SignalRs/SignalData.cs +++ /dev/null @@ -1,52 +0,0 @@ -using System.Buffers; - -namespace AyCode.Services.SignalRs; - -/// -/// Wrapper for byte[] response data with optional ArrayPool lifecycle. -/// Created by AyCodeBinaryHubProtocol for pooled buffers, -/// or directly from byte[] for non-pooled data (server send path). -/// Consumer must Dispose() to return rented buffer. -/// Supports future AsyncEnumerable streaming (per-chunk lifecycle). -/// -public sealed class SignalData : IDisposable -{ - private byte[]? _buffer; - private readonly int _length; - private readonly bool _isRented; - - /// Pooled buffer from ArrayPool (rented, length >= actual data). - public SignalData(byte[] rentedBuffer, int length, bool isRented) - { - _buffer = rentedBuffer; - _length = length; - _isRented = isRented; - } - - /// Non-pooled byte[] (server send, direct creation). - public SignalData(byte[] data) - { - _buffer = data; - _length = data?.Length ?? 0; - _isRented = false; - } - - public ReadOnlySpan Span => _buffer.AsSpan(0, _length); - public int Length => _length; - public bool IsEmpty => _length == 0 || _buffer == null; - - /// - /// Returns a copy as byte[]. Use only when a byte[] is absolutely required. - /// Prefer Span for zero-copy access. - /// - public byte[] ToArray() => Span.ToArray(); - - public void Dispose() - { - if (_isRented && _buffer != null) - { - ArrayPool.Shared.Return(_buffer, clearArray: true); - _buffer = null; - } - } -} diff --git a/AyCode.Services/SignalRs/SignalRSerializationHelper.cs b/AyCode.Services/SignalRs/SignalRSerializationHelper.cs index 882bf0f..acc38a4 100644 --- a/AyCode.Services/SignalRs/SignalRSerializationHelper.cs +++ b/AyCode.Services/SignalRs/SignalRSerializationHelper.cs @@ -89,15 +89,6 @@ public static class SignalRSerializationHelper return data.BinaryTo(); } - /// - /// Deserialize binary data from ReadOnlySpan. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static T? DeserializeFromBinary(ReadOnlySpan data) - { - return data.BinaryTo(); - } - #endregion #region JSON Serialization with Brotli