[LOADED_DOCS: .github\copilot-instructions.md]

Refactor SignalR protocol type resolution logic

Removed SignalParams.SignalDataType and migrated type resolution to protocol headers using new WriteHeader/ReadHeader extensibility hooks. AyCodeBinaryHubProtocol now writes and reads the concrete data argument type in the message header, enabling correct deserialization of object-typed arguments. Updated AcBinaryHubProtocol to support header context and made relevant helpers protected. Cleaned up legacy SignalDataType logic and improved documentation.
This commit is contained in:
Loretta 2026-04-19 12:58:31 +02:00
parent 4343ab4d53
commit 19c470251d
6 changed files with 124 additions and 37 deletions

View File

@ -115,7 +115,6 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogg
{
Status = status,
DataSerializerType = SerializerOptions.SerializerType,
SignalDataType = isRawBytes ? null : responseData?.GetType().AssemblyQualifiedName,
IsRawBytesData = isRawBytes
};

View File

@ -16,8 +16,7 @@ public abstract class AcSignalRSendToClientService<TSignalRHub, TSignalRTags, TL
var signalParams = new SignalParams
{
Status = SignalResponseStatus.Success,
DataSerializerType = AyCode.Core.Serializers.AcSerializerType.Binary,
SignalDataType = content?.GetType().AssemblyQualifiedName
DataSerializerType = AyCode.Core.Serializers.AcSerializerType.Binary
};
Logger.Info($"Server sending to client; {ConstHelper.NameByValue<TSignalRTags>(messageTag)}");

View File

@ -326,7 +326,6 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
{
Status = status,
DataSerializerType = SerializerOptions.SerializerType,
SignalDataType = isRawBytes ? null : responseData?.GetType().AssemblyQualifiedName,
IsRawBytesData = isRawBytes
};

View File

@ -66,6 +66,14 @@ public class AcBinaryHubProtocol : IHubProtocol
/// </summary>
private readonly ConditionalWeakTable<IInvocationBinder, AsyncChunkState> _chunkStates;
/// <summary>
/// Opaque context produced by <see cref="ReadHeader"/> for the currently-parsed message.
/// Set by parse methods (ParseInvocation, ParseStreamInvocation, ParseStreamItem, ParseCompletion)
/// right after reading the per-message header. Derived protocols can read this to customize
/// argument deserialization (e.g., type resolution when <c>targetType == typeof(object)</c>).
/// </summary>
protected object? _currentHeaderContext;
private sealed class AsyncChunkState
{
public HubMessage PartialMessage = null!;
@ -121,6 +129,40 @@ public class AcBinaryHubProtocol : IHubProtocol
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
#region Extensibility Hooks
/// <summary>
/// Called right after the message type byte (both chunked and non-chunked paths).
/// Derived protocols can write extra header fields here (e.g., a type AQN for untyped args).
/// <para>
/// Default implementation writes nothing — base protocol is fully generic and has no per-message
/// extra state. Derived classes <b>must</b> read exactly the same bytes in <see cref="ReadHeader"/>.
/// </para>
/// </summary>
/// <param name="bw">Output writer (same one used for the message payload).</param>
/// <param name="message">The message being written.</param>
/// <param name="streamedArg">
/// When the chunked path activates, this is the actual argument being streamed (so the derived
/// class can use its concrete runtime type). <c>null</c> for non-chunked messages.
/// </param>
protected virtual void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
// Base: no extra header.
}
/// <summary>
/// Reads the per-message header written by <see cref="WriteHeader"/> on the sender side.
/// Called right after the message type byte has been consumed.
/// <para>
/// Returns an opaque context object that is stored in <see cref="_currentHeaderContext"/>
/// for derived classes to consume during the rest of the parse.
/// Default implementation returns <c>null</c>.
/// </para>
/// </summary>
protected virtual object? ReadHeader(ref SequenceReader<byte> r) => null;
#endregion
#region WriteMessage
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
@ -205,6 +247,7 @@ public class AcBinaryHubProtocol : IHubProtocol
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg: null);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
@ -215,6 +258,7 @@ public class AcBinaryHubProtocol : IHubProtocol
private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamInvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
@ -225,6 +269,7 @@ public class AcBinaryHubProtocol : IHubProtocol
private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamItemMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteArgument(ref bw, output, m.Item, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
@ -233,6 +278,7 @@ public class AcBinaryHubProtocol : IHubProtocol
private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, CompletionMessage m, ref int externalBytes)
{
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg: null);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
@ -336,6 +382,7 @@ public class AcBinaryHubProtocol : IHubProtocol
{
case InvocationMessage m:
bw.WriteByte(MsgInvocation);
WriteHeader(ref bw, m, streamedArg);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
@ -345,6 +392,7 @@ public class AcBinaryHubProtocol : IHubProtocol
case StreamInvocationMessage m:
bw.WriteByte(MsgStreamInvocation);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArgumentsChunked(ref bw, pipeWriter, m.Arguments, streamedArgIndex, ref externalBytes);
@ -354,6 +402,7 @@ public class AcBinaryHubProtocol : IHubProtocol
case StreamItemMessage m:
bw.WriteByte(MsgStreamItem);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteRaw(-1); // streamed arg marker
WriteHeaders(ref bw, m.Headers);
@ -361,6 +410,7 @@ public class AcBinaryHubProtocol : IHubProtocol
case CompletionMessage m:
bw.WriteByte(MsgCompletion);
WriteHeader(ref bw, m, streamedArg);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(1); // hasResult = true
@ -583,6 +633,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadNullableString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
@ -601,6 +653,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private HubMessage ParseStreamInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
@ -616,6 +670,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private HubMessage ParseStreamItem(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var itemType = binder.GetStreamItemType(invocationId);
var item = ReadSingleArgument(ref r, itemType);
@ -629,6 +685,8 @@ public class AcBinaryHubProtocol : IHubProtocol
private HubMessage ParseCompletion(ref SequenceReader<byte> r, IInvocationBinder binder)
{
_currentHeaderContext = ReadHeader(ref r);
var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r);
@ -816,7 +874,13 @@ public class AcBinaryHubProtocol : IHubProtocol
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
streamedType = ResolveStreamedArgType(streamedType);
// Prefer type from WriteHeader (set in _currentHeaderContext by the dispatched Parse* method).
// Falls back to binder-provided type (base generic behavior).
if (_currentHeaderContext is Type headerType)
streamedType = headerType;
else
streamedType = ResolveStreamedArgType(streamedType);
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
streamedIndex, streamedType.Name);
@ -1085,7 +1149,7 @@ public class AcBinaryHubProtocol : IHubProtocol
#region Write Framing Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
protected static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
{
if (value == null)
{
@ -1159,7 +1223,7 @@ public class AcBinaryHubProtocol : IHubProtocol
return value;
}
private static string ReadString(ref SequenceReader<byte> r)
protected static string ReadString(ref SequenceReader<byte> r)
{
var byteCount = (int)ReadVarUInt(ref r);
if (byteCount == 0)
@ -1171,7 +1235,7 @@ public class AcBinaryHubProtocol : IHubProtocol
: Encoding.UTF8.GetString(bytes.ToArray());
}
private static string? ReadNullableString(ref SequenceReader<byte> r)
protected static string? ReadNullableString(ref SequenceReader<byte> r)
{
r.TryRead(out var marker);
return marker == 0 ? null : ReadString(ref r);

View File

@ -1,26 +1,73 @@
using System;
using System.Buffers;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs;
/// <summary>
/// Project-specific binary protocol with SignalParams-aware argument deserialization.
/// Register this in PluginNopStartup.cs and AcSignalRClientBase instead of AcBinaryHubProtocol.
/// Project-specific binary protocol.
///
/// Overrides the base <see cref="WriteHeader"/>/<see cref="ReadHeader"/> hooks to carry the
/// runtime type of the streamed / last data argument in each message. This is needed because
/// our <c>OnReceiveMessage(int, int?, SignalParams, object)</c> convention has the last argument
/// typed as <c>object</c>, so the binder can't tell the deserializer what concrete type to produce.
///
/// With the header in place, the concrete type travels on the wire and is available before the
/// (non-streamed) data argument is read, and before the streamed argument's Task.Run starts.
/// There is no dependency on reading <c>SignalParams</c> first, so it works regardless of whether
/// <c>SignalParams</c> is inline or streamed.
/// </summary>
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
{
/// <summary>
/// Parsed SignalParams from current message (arg[2]).
/// Used by ReadSingleArgument (arg[3]) for type-aware deserialization.
/// Thread-safe: SignalR processes messages sequentially per connection.
/// Still used for <see cref="SignalParams.IsRawBytesData"/>, which opts out of deserialization.
/// </summary>
private SignalParams? _currentSignalParams;
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
#region Header: per-message concrete type of the data argument
/// <summary>
/// Writes the AssemblyQualifiedName of the concrete type of the data argument.
/// <para>
/// When chunked mode is active, <paramref name="streamedArg"/> is the argument being streamed.
/// When non-chunked, we pick the last non-null argument from the message (project convention:
/// <c>OnReceiveMessage(int, int?, SignalParams, object data)</c> — the data arg is last).
/// </para>
/// </summary>
protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
var typeSource = streamedArg ?? GetDataArg(message);
var typeName = typeSource?.GetType().AssemblyQualifiedName;
WriteNullableString(ref bw, typeName);
}
/// <summary>
/// Reads the type AQN and resolves it via <see cref="Type.GetType(string)"/>.
/// Returns the resolved <see cref="Type"/> (or null if absent / unresolvable).
/// </summary>
protected override object? ReadHeader(ref SequenceReader<byte> r)
{
var typeName = ReadNullableString(ref r);
return typeName != null ? Type.GetType(typeName) : null;
}
private static object? GetDataArg(HubMessage message) => message switch
{
InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
StreamInvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
StreamItemMessage m => m.Item,
CompletionMessage m => m.HasResult ? m.Result : null,
_ => null
};
#endregion
protected override void OnArgumentRead(object? value, int index)
{
if (value is SignalParams sp)
@ -53,17 +100,13 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
return SequenceToByteArray(argSlice.Slice(1));
}
// IsRawBytesData: return raw bytes, consumer deserializes
// IsRawBytesData: return raw bytes, consumer deserializes later
if (_currentSignalParams is { IsRawBytesData: true })
return SequenceToByteArray(argSlice);
// SignalDataType: resolve actual type for eager deserialization
if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null)
{
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
if (dataType != null)
targetType = dataType;
}
// Type resolution: prefer concrete type from the per-message header
if (targetType == typeof(object) && _currentHeaderContext is Type headerType)
targetType = headerType;
// Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead)
if (_protocolMode == BinaryProtocolMode.Bytes)
@ -74,15 +117,4 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
return DeserializeFromSequence(argSlice, targetType, Options);
}
protected override Type ResolveStreamedArgType(Type binderType)
{
if (binderType == typeof(object) && _currentSignalParams?.SignalDataType != null)
{
var resolved = Type.GetType(_currentSignalParams.SignalDataType);
if (resolved != null)
return resolved;
}
return binderType;
}
}

View File

@ -34,12 +34,6 @@ public class SignalParams : ISignalParams
/// </summary>
public byte[]? Parameters { get; set; }
/// <summary>
/// AssemblyQualifiedName of the response data type.
/// Set by server before sending. Protocol uses this to deserialize directly to the target type.
/// </summary>
public string? SignalDataType { get; set; }
/// <summary>
/// Client sets true when requesting raw byte[] (e.g. DataSource populate/merge).
/// Server: reads this from client's SignalParams → serializes object → byte[] directly.