diff --git a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs index d93695c..5524f39 100644 --- a/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs +++ b/AyCode.Services.Server.Tests/SignalRs/TestableSignalRHub2.cs @@ -115,7 +115,6 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase(messageTag)}"); diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index 8e13d7f..99e36ed 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -326,7 +326,6 @@ public abstract class AcWebSignalRHubBase(IConfiguration { Status = status, DataSerializerType = SerializerOptions.SerializerType, - SignalDataType = isRawBytes ? null : responseData?.GetType().AssemblyQualifiedName, IsRawBytesData = isRawBytes }; diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 13d0831..b6ecce2 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -66,6 +66,14 @@ public class AcBinaryHubProtocol : IHubProtocol /// private readonly ConditionalWeakTable _chunkStates; + /// + /// Opaque context produced by for the currently-parsed message. + /// Set by parse methods (ParseInvocation, ParseStreamInvocation, ParseStreamItem, ParseCompletion) + /// right after reading the per-message header. Derived protocols can read this to customize + /// argument deserialization (e.g., type resolution when targetType == typeof(object)). + /// + protected object? _currentHeaderContext; + private sealed class AsyncChunkState { public HubMessage PartialMessage = null!; @@ -121,6 +129,40 @@ public class AcBinaryHubProtocol : IHubProtocol [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsVersionSupported(int version) => version <= Version; + #region Extensibility Hooks + + /// + /// Called right after the message type byte (both chunked and non-chunked paths). + /// Derived protocols can write extra header fields here (e.g., a type AQN for untyped args). + /// + /// Default implementation writes nothing — base protocol is fully generic and has no per-message + /// extra state. Derived classes must read exactly the same bytes in . + /// + /// + /// Output writer (same one used for the message payload). + /// The message being written. + /// + /// When the chunked path activates, this is the actual argument being streamed (so the derived + /// class can use its concrete runtime type). null for non-chunked messages. + /// + protected virtual void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg) + { + // Base: no extra header. + } + + /// + /// Reads the per-message header written by on the sender side. + /// Called right after the message type byte has been consumed. + /// + /// Returns an opaque context object that is stored in + /// for derived classes to consume during the rest of the parse. + /// Default implementation returns null. + /// + /// + protected virtual object? ReadHeader(ref SequenceReader r) => null; + + #endregion + #region WriteMessage public ReadOnlyMemory GetMessageBytes(HubMessage message) @@ -205,6 +247,7 @@ public class AcBinaryHubProtocol : IHubProtocol private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgInvocation); + WriteHeader(ref bw, m, streamedArg: null); WriteNullableString(ref bw, m.InvocationId); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); @@ -215,6 +258,7 @@ public class AcBinaryHubProtocol : IHubProtocol private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamInvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamInvocation); + WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); @@ -225,6 +269,7 @@ public class AcBinaryHubProtocol : IHubProtocol private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamItemMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamItem); + WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); WriteArgument(ref bw, output, m.Item, ref externalBytes); WriteHeaders(ref bw, m.Headers); @@ -233,6 +278,7 @@ public class AcBinaryHubProtocol : IHubProtocol private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter output, CompletionMessage m, ref int externalBytes) { bw.WriteByte(MsgCompletion); + WriteHeader(ref bw, m, streamedArg: null); bw.WriteStringUtf8(m.InvocationId!); WriteNullableString(ref bw, m.Error); @@ -336,6 +382,7 @@ public class AcBinaryHubProtocol : IHubProtocol { case InvocationMessage m: bw.WriteByte(MsgInvocation); + WriteHeader(ref bw, m, streamedArg); WriteNullableString(ref bw, m.InvocationId); bw.WriteStringUtf8(m.Target); WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); @@ -345,6 +392,7 @@ public class AcBinaryHubProtocol : IHubProtocol case StreamInvocationMessage m: bw.WriteByte(MsgStreamInvocation); + WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); bw.WriteStringUtf8(m.Target); WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes); @@ -354,6 +402,7 @@ public class AcBinaryHubProtocol : IHubProtocol case StreamItemMessage m: bw.WriteByte(MsgStreamItem); + WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); bw.WriteRaw(-1); // streamed arg marker WriteHeaders(ref bw, m.Headers); @@ -361,6 +410,7 @@ public class AcBinaryHubProtocol : IHubProtocol case CompletionMessage m: bw.WriteByte(MsgCompletion); + WriteHeader(ref bw, m, streamedArg); bw.WriteStringUtf8(m.InvocationId!); WriteNullableString(ref bw, m.Error); bw.WriteByte(1); // hasResult = true @@ -583,6 +633,8 @@ public class AcBinaryHubProtocol : IHubProtocol private HubMessage ParseInvocation(ref SequenceReader r, IInvocationBinder binder) { + _currentHeaderContext = ReadHeader(ref r); + var invocationId = ReadNullableString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); @@ -601,6 +653,8 @@ public class AcBinaryHubProtocol : IHubProtocol private HubMessage ParseStreamInvocation(ref SequenceReader r, IInvocationBinder binder) { + _currentHeaderContext = ReadHeader(ref r); + var invocationId = ReadString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); @@ -616,6 +670,8 @@ public class AcBinaryHubProtocol : IHubProtocol private HubMessage ParseStreamItem(ref SequenceReader r, IInvocationBinder binder) { + _currentHeaderContext = ReadHeader(ref r); + var invocationId = ReadString(ref r); var itemType = binder.GetStreamItemType(invocationId); var item = ReadSingleArgument(ref r, itemType); @@ -629,6 +685,8 @@ public class AcBinaryHubProtocol : IHubProtocol private HubMessage ParseCompletion(ref SequenceReader r, IInvocationBinder binder) { + _currentHeaderContext = ReadHeader(ref r); + var invocationId = ReadString(ref r); var error = ReadNullableString(ref r); @@ -816,7 +874,13 @@ public class AcBinaryHubProtocol : IHubProtocol // Find the placeholder arg and its target type var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder); - streamedType = ResolveStreamedArgType(streamedType); + + // Prefer type from WriteHeader (set in _currentHeaderContext by the dispatched Parse* method). + // Falls back to binder-provided type (base generic behavior). + if (_currentHeaderContext is Type headerType) + streamedType = headerType; + else + streamedType = ResolveStreamedArgType(streamedType); _logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}", streamedIndex, streamedType.Name); @@ -1085,7 +1149,7 @@ public class AcBinaryHubProtocol : IHubProtocol #region Write Framing Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) + protected static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) { if (value == null) { @@ -1159,7 +1223,7 @@ public class AcBinaryHubProtocol : IHubProtocol return value; } - private static string ReadString(ref SequenceReader r) + protected static string ReadString(ref SequenceReader r) { var byteCount = (int)ReadVarUInt(ref r); if (byteCount == 0) @@ -1171,7 +1235,7 @@ public class AcBinaryHubProtocol : IHubProtocol : Encoding.UTF8.GetString(bytes.ToArray()); } - private static string? ReadNullableString(ref SequenceReader r) + protected static string? ReadNullableString(ref SequenceReader r) { r.TryRead(out var marker); return marker == 0 ? null : ReadString(ref r); diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs index 0ca3c47..3e5fbe7 100644 --- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs @@ -1,26 +1,73 @@ using System; using System.Buffers; using AyCode.Core.Serializers.Binaries; +using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs; /// -/// Project-specific binary protocol with SignalParams-aware argument deserialization. -/// Register this in PluginNopStartup.cs and AcSignalRClientBase instead of AcBinaryHubProtocol. +/// Project-specific binary protocol. +/// +/// Overrides the base / hooks to carry the +/// runtime type of the streamed / last data argument in each message. This is needed because +/// our OnReceiveMessage(int, int?, SignalParams, object) convention has the last argument +/// typed as object, so the binder can't tell the deserializer what concrete type to produce. +/// +/// With the header in place, the concrete type travels on the wire and is available before the +/// (non-streamed) data argument is read, and before the streamed argument's Task.Run starts. +/// There is no dependency on reading SignalParams first, so it works regardless of whether +/// SignalParams is inline or streamed. /// public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol { /// /// Parsed SignalParams from current message (arg[2]). - /// Used by ReadSingleArgument (arg[3]) for type-aware deserialization. - /// Thread-safe: SignalR processes messages sequentially per connection. + /// Still used for , which opts out of deserialization. /// private SignalParams? _currentSignalParams; public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { } + #region Header: per-message concrete type of the data argument + + /// + /// Writes the AssemblyQualifiedName of the concrete type of the data argument. + /// + /// When chunked mode is active, is the argument being streamed. + /// When non-chunked, we pick the last non-null argument from the message (project convention: + /// OnReceiveMessage(int, int?, SignalParams, object data) — the data arg is last). + /// + /// + protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg) + { + var typeSource = streamedArg ?? GetDataArg(message); + var typeName = typeSource?.GetType().AssemblyQualifiedName; + WriteNullableString(ref bw, typeName); + } + + /// + /// Reads the type AQN and resolves it via . + /// Returns the resolved (or null if absent / unresolvable). + /// + protected override object? ReadHeader(ref SequenceReader r) + { + var typeName = ReadNullableString(ref r); + return typeName != null ? Type.GetType(typeName) : null; + } + + private static object? GetDataArg(HubMessage message) => message switch + { + InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1], + StreamInvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1], + StreamItemMessage m => m.Item, + CompletionMessage m => m.HasResult ? m.Result : null, + _ => null + }; + + #endregion + protected override void OnArgumentRead(object? value, int index) { if (value is SignalParams sp) @@ -53,17 +100,13 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol return SequenceToByteArray(argSlice.Slice(1)); } - // IsRawBytesData: return raw bytes, consumer deserializes + // IsRawBytesData: return raw bytes, consumer deserializes later if (_currentSignalParams is { IsRawBytesData: true }) return SequenceToByteArray(argSlice); - // SignalDataType: resolve actual type for eager deserialization - if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null) - { - var dataType = Type.GetType(_currentSignalParams.SignalDataType); - if (dataType != null) - targetType = dataType; - } + // Type resolution: prefer concrete type from the per-message header + if (targetType == typeof(object) && _currentHeaderContext is Type headerType) + targetType = headerType; // Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead) if (_protocolMode == BinaryProtocolMode.Bytes) @@ -74,15 +117,4 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol return DeserializeFromSequence(argSlice, targetType, Options); } - - protected override Type ResolveStreamedArgType(Type binderType) - { - if (binderType == typeof(object) && _currentSignalParams?.SignalDataType != null) - { - var resolved = Type.GetType(_currentSignalParams.SignalDataType); - if (resolved != null) - return resolved; - } - return binderType; - } } diff --git a/AyCode.Services/SignalRs/ISignalParams.cs b/AyCode.Services/SignalRs/ISignalParams.cs index 32c53a4..384c1e9 100644 --- a/AyCode.Services/SignalRs/ISignalParams.cs +++ b/AyCode.Services/SignalRs/ISignalParams.cs @@ -34,12 +34,6 @@ public class SignalParams : ISignalParams /// public byte[]? Parameters { get; set; } - /// - /// AssemblyQualifiedName of the response data type. - /// Set by server before sending. Protocol uses this to deserialize directly to the target type. - /// - public string? SignalDataType { get; set; } - /// /// Client sets true when requesting raw byte[] (e.g. DataSource populate/merge). /// Server: reads this from client's SignalParams → serializes object → byte[] directly.