[LOADED_DOCS: .github\copilot-instructions.md]
Refactor AyCodeBinaryHubProtocol header logic Refactored the per-message header to use a DataFlags enum, encoding data argument properties in a single byte for more expressive client handling. Introduced HeaderContext to encapsulate header state. Updated WriteHeader and ReadHeader to use the new format, and refactored ReadSingleArgument to support fast-paths for byte[], ConsumerDeserialize, and header-supplied types. Removed obsolete _currentSignalParams logic and improved documentation throughout.
This commit is contained in:
parent
19c470251d
commit
d0ab01d08e
|
|
@ -875,11 +875,8 @@ public class AcBinaryHubProtocol : IHubProtocol
|
||||||
// Find the placeholder arg and its target type
|
// Find the placeholder arg and its target type
|
||||||
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
|
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
|
||||||
|
|
||||||
// Prefer type from WriteHeader (set in _currentHeaderContext by the dispatched Parse* method).
|
// Derived classes can override ResolveStreamedArgType to consult _currentHeaderContext
|
||||||
// Falls back to binder-provided type (base generic behavior).
|
// (set by ReadHeader) or any other per-message state.
|
||||||
if (_currentHeaderContext is Type headerType)
|
|
||||||
streamedType = headerType;
|
|
||||||
else
|
|
||||||
streamedType = ResolveStreamedArgType(streamedType);
|
streamedType = ResolveStreamedArgType(streamedType);
|
||||||
|
|
||||||
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
|
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
|
||||||
|
|
|
||||||
|
|
@ -9,54 +9,109 @@ namespace AyCode.Services.SignalRs;
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Project-specific binary protocol.
|
/// Project-specific binary protocol.
|
||||||
///
|
///
|
||||||
/// Overrides the base <see cref="WriteHeader"/>/<see cref="ReadHeader"/> hooks to carry the
|
/// Adds a per-message wire header (via <see cref="WriteHeader"/>/<see cref="ReadHeader"/> hooks)
|
||||||
/// runtime type of the streamed / last data argument in each message. This is needed because
|
/// that expresses how the data argument (args[^1] by convention) should be handled by the client:
|
||||||
/// our <c>OnReceiveMessage(int, int?, SignalParams, object)</c> convention has the last argument
|
/// <list type="bullet">
|
||||||
/// typed as <c>object</c>, so the binder can't tell the deserializer what concrete type to produce.
|
/// <item><c>HasData</c>: the data arg is not null.</item>
|
||||||
|
/// <item><c>Streamed</c>: the data arg comes via CHUNK_DATA chunks (inline placeholder on the wire).</item>
|
||||||
|
/// <item><c>ConsumerDeserialize</c>: the client returns raw <c>byte[]</c> to the consumer (IsRawBytesData flow — e.g. DataSource PopulateMerge).</item>
|
||||||
|
/// <item><c>HasType</c>: the concrete AQN of the data arg follows (for typed deserialization).</item>
|
||||||
|
/// </list>
|
||||||
///
|
///
|
||||||
/// With the header in place, the concrete type travels on the wire and is available before the
|
/// With this header the client no longer needs to inspect <c>SignalParams</c> in order to decide
|
||||||
/// (non-streamed) data argument is read, and before the streamed argument's Task.Run starts.
|
/// how to treat the data arg — so it works even when <c>SignalParams</c> is itself streamed
|
||||||
/// There is no dependency on reading <c>SignalParams</c> first, so it works regardless of whether
|
/// (which happens when args[^1] is <c>byte[]</c> or <c>null</c>).
|
||||||
/// <c>SignalParams</c> is inline or streamed.
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
|
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
|
||||||
{
|
{
|
||||||
/// <summary>
|
|
||||||
/// Parsed SignalParams from current message (arg[2]).
|
|
||||||
/// Still used for <see cref="SignalParams.IsRawBytesData"/>, which opts out of deserialization.
|
|
||||||
/// </summary>
|
|
||||||
private SignalParams? _currentSignalParams;
|
|
||||||
|
|
||||||
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
|
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
|
||||||
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
|
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
|
||||||
|
|
||||||
#region Header: per-message concrete type of the data argument
|
#region Wire header (per-message)
|
||||||
|
|
||||||
|
[Flags]
|
||||||
|
private enum DataFlags : byte
|
||||||
|
{
|
||||||
|
None = 0,
|
||||||
|
HasData = 1 << 0, // the data arg is not null
|
||||||
|
Streamed = 1 << 1, // the data arg is delivered via CHUNK_DATA chunks
|
||||||
|
ConsumerDeserialize = 1 << 2, // client returns raw byte[] to the consumer (IsRawBytesData flow)
|
||||||
|
HasType = 1 << 3, // a type AQN string follows (for typed deserialization)
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Writes the AssemblyQualifiedName of the concrete type of the data argument.
|
/// Opaque context produced by <see cref="ReadHeader"/> and stashed in
|
||||||
/// <para>
|
/// <see cref="AcBinaryHubProtocol._currentHeaderContext"/>. Consumed by
|
||||||
/// When chunked mode is active, <paramref name="streamedArg"/> is the argument being streamed.
|
/// <see cref="ReadSingleArgument"/> and <see cref="ResolveStreamedArgType"/>.
|
||||||
/// When non-chunked, we pick the last non-null argument from the message (project convention:
|
/// </summary>
|
||||||
/// <c>OnReceiveMessage(int, int?, SignalParams, object data)</c> — the data arg is last).
|
private sealed class HeaderContext
|
||||||
/// </para>
|
{
|
||||||
|
public DataFlags Flags { get; }
|
||||||
|
public Type? Type { get; }
|
||||||
|
public HeaderContext(DataFlags flags, Type? type) { Flags = flags; Type = type; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Writes the per-message header. See <see cref="DataFlags"/> for the semantics of each bit.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
|
protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
|
||||||
{
|
{
|
||||||
var typeSource = streamedArg ?? GetDataArg(message);
|
var dataArg = GetDataArg(message);
|
||||||
var typeName = typeSource?.GetType().AssemblyQualifiedName;
|
var flags = DataFlags.None;
|
||||||
|
string? typeName = null;
|
||||||
|
|
||||||
|
if (dataArg != null)
|
||||||
|
{
|
||||||
|
flags |= DataFlags.HasData;
|
||||||
|
|
||||||
|
// Streamed: chunked mode active AND streamedArg is the data arg
|
||||||
|
if (streamedArg != null && ReferenceEquals(streamedArg, dataArg))
|
||||||
|
flags |= DataFlags.Streamed;
|
||||||
|
|
||||||
|
// ConsumerDeserialize: the client requested byte[] via IsRawBytesData
|
||||||
|
// and the data arg is (or was pre-serialized to) byte[].
|
||||||
|
// Detected via the sibling SignalParams arg (project convention).
|
||||||
|
if (dataArg is byte[] && GetClientIsRawBytesData(message))
|
||||||
|
flags |= DataFlags.ConsumerDeserialize;
|
||||||
|
|
||||||
|
// HasType: for typed args we write the concrete AQN so the client can deserialize
|
||||||
|
// directly (either inline or after streamed chunks are gathered).
|
||||||
|
// byte[] needs no type info — either base byte[] fast-path or ConsumerDeserialize handles it.
|
||||||
|
if (dataArg is not byte[])
|
||||||
|
{
|
||||||
|
flags |= DataFlags.HasType;
|
||||||
|
typeName = dataArg.GetType().AssemblyQualifiedName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bw.WriteByte((byte)flags);
|
||||||
|
if ((flags & DataFlags.HasType) != 0)
|
||||||
WriteNullableString(ref bw, typeName);
|
WriteNullableString(ref bw, typeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Reads the type AQN and resolves it via <see cref="Type.GetType(string)"/>.
|
/// Reads the per-message header and returns a <see cref="HeaderContext"/>.
|
||||||
/// Returns the resolved <see cref="Type"/> (or null if absent / unresolvable).
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected override object? ReadHeader(ref SequenceReader<byte> r)
|
protected override object? ReadHeader(ref SequenceReader<byte> r)
|
||||||
|
{
|
||||||
|
r.TryRead(out byte flagsByte);
|
||||||
|
var flags = (DataFlags)flagsByte;
|
||||||
|
|
||||||
|
Type? resolvedType = null;
|
||||||
|
if ((flags & DataFlags.HasType) != 0)
|
||||||
{
|
{
|
||||||
var typeName = ReadNullableString(ref r);
|
var typeName = ReadNullableString(ref r);
|
||||||
return typeName != null ? Type.GetType(typeName) : null;
|
if (typeName != null)
|
||||||
|
resolvedType = Type.GetType(typeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return new HeaderContext(flags, resolvedType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The data arg by project convention — the last argument for Invocation messages,
|
||||||
|
/// <see cref="StreamItemMessage.Item"/> for stream items, <see cref="CompletionMessage.Result"/> for completions.
|
||||||
|
/// </summary>
|
||||||
private static object? GetDataArg(HubMessage message) => message switch
|
private static object? GetDataArg(HubMessage message) => message switch
|
||||||
{
|
{
|
||||||
InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
|
InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
|
||||||
|
|
@ -66,14 +121,43 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
|
||||||
_ => null
|
_ => null
|
||||||
};
|
};
|
||||||
|
|
||||||
#endregion
|
/// <summary>
|
||||||
|
/// Extracts <see cref="SignalParams.IsRawBytesData"/> from the message, assuming the project
|
||||||
protected override void OnArgumentRead(object? value, int index)
|
/// convention of <c>OnReceiveMessage(int, int?, SignalParams, object)</c> — arg[2] is SignalParams.
|
||||||
|
/// Returns false for messages that don't follow this shape.
|
||||||
|
/// </summary>
|
||||||
|
private static bool GetClientIsRawBytesData(HubMessage message)
|
||||||
{
|
{
|
||||||
if (value is SignalParams sp)
|
if (message is InvocationMessage im && im.Arguments.Length >= 3
|
||||||
_currentSignalParams = sp;
|
&& im.Arguments[2] is SignalParams sp)
|
||||||
|
return sp.IsRawBytesData;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// For the chunked streaming path: resolve the type the background Task will deserialize into.
|
||||||
|
/// Prefers the concrete type from the wire header (set by <see cref="ReadHeader"/>) when present,
|
||||||
|
/// otherwise falls back to the binder-provided type (base behavior).
|
||||||
|
/// </summary>
|
||||||
|
protected override Type ResolveStreamedArgType(Type binderType)
|
||||||
|
{
|
||||||
|
if (_currentHeaderContext is HeaderContext hctx && hctx.Type != null)
|
||||||
|
return hctx.Type;
|
||||||
|
return base.ResolveStreamedArgType(binderType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Read a single argument, using the per-message header to decide how to treat <c>object</c>-typed args.
|
||||||
|
/// Decision order:
|
||||||
|
/// <list type="number">
|
||||||
|
/// <item>Base byte[] fast-path — tag <c>0x44</c> present (file/image/raw bytes from a typed <c>byte[]</c> param).</item>
|
||||||
|
/// <item>Header <see cref="DataFlags.ConsumerDeserialize"/> — return raw bytes (consumer handles deserialization later).</item>
|
||||||
|
/// <item>Header <see cref="DataFlags.HasType"/> — resolve target type from the header and deserialize.</item>
|
||||||
|
/// <item>Fall through to base typed deserialization against the binder-provided target type.</item>
|
||||||
|
/// </list>
|
||||||
|
/// </summary>
|
||||||
protected override object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
|
protected override object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
|
||||||
{
|
{
|
||||||
r.TryReadLittleEndian(out int argLength);
|
r.TryReadLittleEndian(out int argLength);
|
||||||
|
|
@ -93,22 +177,26 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
|
||||||
var argSlice = r.UnreadSequence.Slice(0, argLength);
|
var argSlice = r.UnreadSequence.Slice(0, argLength);
|
||||||
r.Advance(argLength);
|
r.Advance(argLength);
|
||||||
|
|
||||||
// byte[] fast-path: tag only, no VarUInt (argLength implies size)
|
// 1. Base byte[] fast-path: [0x44 tag][raw bytes] — strip tag, return byte[]
|
||||||
var argReader = new SequenceReader<byte>(argSlice);
|
var argReader = new SequenceReader<byte>(argSlice);
|
||||||
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
|
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
|
||||||
{
|
|
||||||
return SequenceToByteArray(argSlice.Slice(1));
|
return SequenceToByteArray(argSlice.Slice(1));
|
||||||
}
|
|
||||||
|
|
||||||
// IsRawBytesData: return raw bytes, consumer deserializes later
|
var hctx = _currentHeaderContext as HeaderContext;
|
||||||
if (_currentSignalParams is { IsRawBytesData: true })
|
|
||||||
|
// 2. Header ConsumerDeserialize: no tag on wire (isAcBinary path on server),
|
||||||
|
// consumer wants raw byte[] — return as-is without deserialization.
|
||||||
|
// Applies only to the data arg (convention: targetType == typeof(object));
|
||||||
|
// typed args (Int32, SignalParams, etc.) are unaffected.
|
||||||
|
if (targetType == typeof(object)
|
||||||
|
&& hctx != null && (hctx.Flags & DataFlags.ConsumerDeserialize) != 0)
|
||||||
return SequenceToByteArray(argSlice);
|
return SequenceToByteArray(argSlice);
|
||||||
|
|
||||||
// Type resolution: prefer concrete type from the per-message header
|
// 3. Type resolution: prefer concrete type from header over binder type (which is often typeof(object))
|
||||||
if (targetType == typeof(object) && _currentHeaderContext is Type headerType)
|
if (targetType == typeof(object) && hctx?.Type != null)
|
||||||
targetType = headerType;
|
targetType = hctx.Type;
|
||||||
|
|
||||||
// Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead)
|
// 4. Deserialize — Bytes mode linearizes, Segment/AsyncSegment uses the sequence directly
|
||||||
if (_protocolMode == BinaryProtocolMode.Bytes)
|
if (_protocolMode == BinaryProtocolMode.Bytes)
|
||||||
{
|
{
|
||||||
var bytes = SequenceToByteArray(argSlice);
|
var bytes = SequenceToByteArray(argSlice);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue