diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
index b6ecce2..cdf9d9d 100644
--- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
+++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs
@@ -875,12 +875,9 @@ public class AcBinaryHubProtocol : IHubProtocol
// Find the placeholder arg and its target type
var (args, streamedIndex, streamedType) = FindStreamedArgSlot(partialMessage, binder);
- // Prefer type from WriteHeader (set in _currentHeaderContext by the dispatched Parse* method).
- // Falls back to binder-provided type (base generic behavior).
- if (_currentHeaderContext is Type headerType)
- streamedType = headerType;
- else
- streamedType = ResolveStreamedArgType(streamedType);
+ // Derived classes can override ResolveStreamedArgType to consult _currentHeaderContext
+ // (set by ReadHeader) or any other per-message state.
+ streamedType = ResolveStreamedArgType(streamedType);
_logger?.LogDebug("ParseAsyncChunkStart chunk mode activated streamedIndex={StreamedIndex} streamedType={StreamedType}",
streamedIndex, streamedType.Name);
diff --git a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
index 3e5fbe7..c379d3a 100644
--- a/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
+++ b/AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs
@@ -9,54 +9,109 @@ namespace AyCode.Services.SignalRs;
///
/// Project-specific binary protocol.
///
-/// Overrides the base / hooks to carry the
-/// runtime type of the streamed / last data argument in each message. This is needed because
-/// our OnReceiveMessage(int, int?, SignalParams, object) convention has the last argument
-/// typed as object, so the binder can't tell the deserializer what concrete type to produce.
+/// Adds a per-message wire header (via / hooks)
+/// that expresses how the data argument (args[^1] by convention) should be handled by the client:
+///
+/// - HasData: the data arg is not null.
+/// - Streamed: the data arg comes via CHUNK_DATA chunks (inline placeholder on the wire).
+/// - ConsumerDeserialize: the client returns raw byte[] to the consumer (IsRawBytesData flow — e.g. DataSource PopulateMerge).
+/// - HasType: the concrete AQN of the data arg follows (for typed deserialization).
+///
///
-/// With the header in place, the concrete type travels on the wire and is available before the
-/// (non-streamed) data argument is read, and before the streamed argument's Task.Run starts.
-/// There is no dependency on reading SignalParams first, so it works regardless of whether
-/// SignalParams is inline or streamed.
+/// With this header the client no longer needs to inspect SignalParams in order to decide
+/// how to treat the data arg — so it works even when SignalParams is itself streamed
+/// (which happens when args[^1] is byte[] or null).
///
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
{
- ///
- /// Parsed SignalParams from current message (arg[2]).
- /// Still used for , which opts out of deserialization.
- ///
- private SignalParams? _currentSignalParams;
-
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { }
- #region Header: per-message concrete type of the data argument
+ #region Wire header (per-message)
+
+ [Flags]
+ private enum DataFlags : byte
+ {
+ None = 0,
+ HasData = 1 << 0, // the data arg is not null
+ Streamed = 1 << 1, // the data arg is delivered via CHUNK_DATA chunks
+ ConsumerDeserialize = 1 << 2, // client returns raw byte[] to the consumer (IsRawBytesData flow)
+ HasType = 1 << 3, // a type AQN string follows (for typed deserialization)
+ }
///
- /// Writes the AssemblyQualifiedName of the concrete type of the data argument.
- ///
- /// When chunked mode is active, is the argument being streamed.
- /// When non-chunked, we pick the last non-null argument from the message (project convention:
- /// OnReceiveMessage(int, int?, SignalParams, object data) — the data arg is last).
- ///
+ /// Opaque context produced by and stashed in
+ /// . Consumed by
+ /// and .
+ ///
+ private sealed class HeaderContext
+ {
+ public DataFlags Flags { get; }
+ public Type? Type { get; }
+ public HeaderContext(DataFlags flags, Type? type) { Flags = flags; Type = type; }
+ }
+
+ ///
+ /// Writes the per-message header. See for the semantics of each bit.
///
protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
- var typeSource = streamedArg ?? GetDataArg(message);
- var typeName = typeSource?.GetType().AssemblyQualifiedName;
- WriteNullableString(ref bw, typeName);
+ var dataArg = GetDataArg(message);
+ var flags = DataFlags.None;
+ string? typeName = null;
+
+ if (dataArg != null)
+ {
+ flags |= DataFlags.HasData;
+
+ // Streamed: chunked mode active AND streamedArg is the data arg
+ if (streamedArg != null && ReferenceEquals(streamedArg, dataArg))
+ flags |= DataFlags.Streamed;
+
+ // ConsumerDeserialize: the client requested byte[] via IsRawBytesData
+ // and the data arg is (or was pre-serialized to) byte[].
+ // Detected via the sibling SignalParams arg (project convention).
+ if (dataArg is byte[] && GetClientIsRawBytesData(message))
+ flags |= DataFlags.ConsumerDeserialize;
+
+ // HasType: for typed args we write the concrete AQN so the client can deserialize
+ // directly (either inline or after streamed chunks are gathered).
+ // byte[] needs no type info — either base byte[] fast-path or ConsumerDeserialize handles it.
+ if (dataArg is not byte[])
+ {
+ flags |= DataFlags.HasType;
+ typeName = dataArg.GetType().AssemblyQualifiedName;
+ }
+ }
+
+ bw.WriteByte((byte)flags);
+ if ((flags & DataFlags.HasType) != 0)
+ WriteNullableString(ref bw, typeName);
}
///
- /// Reads the type AQN and resolves it via .
- /// Returns the resolved (or null if absent / unresolvable).
+ /// Reads the per-message header and returns a .
///
protected override object? ReadHeader(ref SequenceReader r)
{
- var typeName = ReadNullableString(ref r);
- return typeName != null ? Type.GetType(typeName) : null;
+ r.TryRead(out byte flagsByte);
+ var flags = (DataFlags)flagsByte;
+
+ Type? resolvedType = null;
+ if ((flags & DataFlags.HasType) != 0)
+ {
+ var typeName = ReadNullableString(ref r);
+ if (typeName != null)
+ resolvedType = Type.GetType(typeName);
+ }
+
+ return new HeaderContext(flags, resolvedType);
}
+ ///
+ /// The data arg by project convention — the last argument for Invocation messages,
+ /// for stream items, for completions.
+ ///
private static object? GetDataArg(HubMessage message) => message switch
{
InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
@@ -66,14 +121,43 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
_ => null
};
- #endregion
-
- protected override void OnArgumentRead(object? value, int index)
+ ///
+ /// Extracts from the message, assuming the project
+ /// convention of OnReceiveMessage(int, int?, SignalParams, object) — arg[2] is SignalParams.
+ /// Returns false for messages that don't follow this shape.
+ ///
+ private static bool GetClientIsRawBytesData(HubMessage message)
{
- if (value is SignalParams sp)
- _currentSignalParams = sp;
+ if (message is InvocationMessage im && im.Arguments.Length >= 3
+ && im.Arguments[2] is SignalParams sp)
+ return sp.IsRawBytesData;
+ return false;
}
+ #endregion
+
+ ///
+ /// For the chunked streaming path: resolve the type the background Task will deserialize into.
+ /// Prefers the concrete type from the wire header (set by ) when present,
+ /// otherwise falls back to the binder-provided type (base behavior).
+ ///
+ protected override Type ResolveStreamedArgType(Type binderType)
+ {
+ if (_currentHeaderContext is HeaderContext hctx && hctx.Type != null)
+ return hctx.Type;
+ return base.ResolveStreamedArgType(binderType);
+ }
+
+ ///
+ /// Read a single argument, using the per-message header to decide how to treat object-typed args.
+ /// Decision order:
+ ///
+ /// - Base byte[] fast-path — tag 0x44 present (file/image/raw bytes from a typed byte[] param).
+ /// - Header — return raw bytes (consumer handles deserialization later).
+ /// - Header — resolve target type from the header and deserialize.
+ /// - Fall through to base typed deserialization against the binder-provided target type.
+ ///
+ ///
protected override object? ReadSingleArgument(ref SequenceReader r, Type targetType)
{
r.TryReadLittleEndian(out int argLength);
@@ -93,22 +177,26 @@ public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
- // byte[] fast-path: tag only, no VarUInt (argLength implies size)
+ // 1. Base byte[] fast-path: [0x44 tag][raw bytes] — strip tag, return byte[]
var argReader = new SequenceReader(argSlice);
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
- {
return SequenceToByteArray(argSlice.Slice(1));
- }
- // IsRawBytesData: return raw bytes, consumer deserializes later
- if (_currentSignalParams is { IsRawBytesData: true })
+ var hctx = _currentHeaderContext as HeaderContext;
+
+ // 2. Header ConsumerDeserialize: no tag on wire (isAcBinary path on server),
+ // consumer wants raw byte[] — return as-is without deserialization.
+ // Applies only to the data arg (convention: targetType == typeof(object));
+ // typed args (Int32, SignalParams, etc.) are unaffected.
+ if (targetType == typeof(object)
+ && hctx != null && (hctx.Flags & DataFlags.ConsumerDeserialize) != 0)
return SequenceToByteArray(argSlice);
- // Type resolution: prefer concrete type from the per-message header
- if (targetType == typeof(object) && _currentHeaderContext is Type headerType)
- targetType = headerType;
+ // 3. Type resolution: prefer concrete type from header over binder type (which is often typeof(object))
+ if (targetType == typeof(object) && hctx?.Type != null)
+ targetType = hctx.Type;
- // Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead)
+ // 4. Deserialize — Bytes mode linearizes, Segment/AsyncSegment uses the sequence directly
if (_protocolMode == BinaryProtocolMode.Bytes)
{
var bytes = SequenceToByteArray(argSlice);