using System; using System.Buffers; using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs; /// /// Project-specific binary protocol. /// /// Adds a per-message wire header (via / hooks) /// that expresses how the data argument (args[^1] by convention) should be handled by the client: /// /// HasData: the data arg is not null. /// Streamed: the data arg comes via CHUNK_DATA chunks (inline placeholder on the wire). /// ConsumerDeserialize: the client returns raw byte[] to the consumer (IsRawBytesData flow — e.g. DataSource PopulateMerge). /// HasType: the concrete AQN of the data arg follows (for typed deserialization). /// /// /// With this header the client no longer needs to inspect SignalParams in order to decide /// how to treat the data arg — so it works even when SignalParams is itself streamed /// (which happens when args[^1] is byte[] or null). /// public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol { /// /// Parameterless constructor — creates the protocol with all-default options. See base class. /// public AyCodeBinaryHubProtocol() : base() { } /// /// Legacy constructor — delegates to the base legacy constructor, which wraps into /// . Kept for backward compatibility. /// public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes, ILogger? logger = null) : base(options, protocolMode, logger) { } /// /// Primary constructor — accepts a fully-configured . /// public AyCodeBinaryHubProtocol(AcBinaryHubProtocolOptions options) : base(options) { } #region Wire header (per-message) [Flags] private enum DataFlags : byte { None = 0, HasData = 1 << 0, // the data arg is not null Streamed = 1 << 1, // the data arg is delivered via CHUNK_DATA chunks ConsumerDeserialize = 1 << 2, // client returns raw byte[] to the consumer (IsRawBytesData flow) HasType = 1 << 3, // a type AQN string follows (for typed deserialization) } /// /// Opaque context produced by and stashed in /// . Consumed by /// and . /// private sealed class HeaderContext { public DataFlags Flags { get; } public Type? Type { get; } public HeaderContext(DataFlags flags, Type? type) { Flags = flags; Type = type; } } /// /// Writes the per-message header. See for the semantics of each bit. /// protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg) { var dataArg = GetDataArg(message); var flags = DataFlags.None; string? typeName = null; if (dataArg != null) { flags |= DataFlags.HasData; // Streamed: chunked mode active AND streamedArg is the data arg if (streamedArg != null && ReferenceEquals(streamedArg, dataArg)) flags |= DataFlags.Streamed; // ConsumerDeserialize: the client requested byte[] via IsRawBytesData // and the data arg is (or was pre-serialized to) byte[]. // Detected via the sibling SignalParams arg (project convention). if (dataArg is byte[] && GetClientIsRawBytesData(message)) flags |= DataFlags.ConsumerDeserialize; // HasType: for typed args we write the concrete AQN so the client can deserialize // directly (either inline or after streamed chunks are gathered). // byte[] needs no type info — either base byte[] fast-path or ConsumerDeserialize handles it. if (dataArg is not byte[]) { flags |= DataFlags.HasType; typeName = dataArg.GetType().AssemblyQualifiedName; } } bw.WriteByte((byte)flags); if ((flags & DataFlags.HasType) != 0) WriteNullableString(ref bw, typeName); } /// /// Reads the per-message header and returns a . /// protected override object? ReadHeader(ref SequenceReader r) { r.TryRead(out byte flagsByte); var flags = (DataFlags)flagsByte; Type? resolvedType = null; if ((flags & DataFlags.HasType) != 0) { var typeName = ReadNullableString(ref r); if (typeName != null) resolvedType = Type.GetType(typeName); } return new HeaderContext(flags, resolvedType); } /// /// The data arg by project convention — the last argument for Invocation messages, /// for stream items, for completions. /// private static object? GetDataArg(HubMessage message) => message switch { InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1], StreamInvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1], StreamItemMessage m => m.Item, CompletionMessage m => m.HasResult ? m.Result : null, _ => null }; /// /// Extracts from the message, assuming the project /// convention of OnReceiveMessage(int, int?, SignalParams, object) — arg[2] is SignalParams. /// Returns false for messages that don't follow this shape. /// private static bool GetClientIsRawBytesData(HubMessage message) { if (message is InvocationMessage im && im.Arguments.Length >= 3 && im.Arguments[2] is SignalParams sp) return sp.IsRawBytesData; return false; } #endregion /// /// For the chunked streaming path: resolve the type the background Task will deserialize into. /// Prefers the concrete type from the wire header (set by ) when present, /// otherwise falls back to the binder-provided type (base behavior). /// protected override Type ResolveStreamedArgType(Type binderType) { if (_currentHeaderContext is HeaderContext hctx && hctx.Type != null) return hctx.Type; return base.ResolveStreamedArgType(binderType); } /// /// Read a single argument, using the per-message header to decide how to treat object-typed args. /// Decision order: /// /// Base byte[] fast-path — tag 0x44 present (file/image/raw bytes from a typed byte[] param). /// Header — return raw bytes (consumer handles deserialization later). /// Header — resolve target type from the header and deserialize. /// Fall through to base typed deserialization against the binder-provided target type. /// /// protected override object? ReadSingleArgument(ref SequenceReader r, Type targetType) { r.TryReadLittleEndian(out int argLength); if (argLength == 0) return null; // AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization if (argLength == -1) return StreamedArgPlaceholder; if (argLength == 1) { r.TryPeek(out byte marker); if (marker == 0) { r.Advance(1); return null; } } var argSlice = r.UnreadSequence.Slice(0, argLength); r.Advance(argLength); // 1. Base byte[] fast-path: [0x44 tag][raw bytes] — strip tag, return byte[] var argReader = new SequenceReader(argSlice); if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray) return SequenceToByteArray(argSlice.Slice(1)); var hctx = _currentHeaderContext as HeaderContext; // 2. Header ConsumerDeserialize: no tag on wire (isAcBinary path on server), // consumer wants raw byte[] — return as-is without deserialization. // Applies only to the data arg (convention: targetType == typeof(object)); // typed args (Int32, SignalParams, etc.) are unaffected. if (targetType == typeof(object) && hctx != null && (hctx.Flags & DataFlags.ConsumerDeserialize) != 0) return SequenceToByteArray(argSlice); // 3. Type resolution: prefer concrete type from header over binder type (which is often typeof(object)) if (targetType == typeof(object) && hctx?.Type != null) targetType = hctx.Type; // 4. Deserialize — unified ArrayBinaryInput path via GetArgBytes. // Single-segment: zero-copy on the pipe's slab. Multi-segment: ArrayPool-rented copy. // _protocolMode no longer affects receive — it is only a send-side strategy. var (arr, offset, length, rented) = GetArgBytes(argSlice); try { return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, Options); } finally { if (rented) ArrayPool.Shared.Return(arr); } } }