using System;
using System.Buffers;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
namespace AyCode.Services.SignalRs;
///
/// Project-specific binary protocol.
///
/// Adds a per-message wire header (via / hooks)
/// that expresses how the data argument (args[^1] by convention) should be handled by the client:
///
/// - HasData: the data arg is not null.
/// - Streamed: the data arg comes via CHUNK_DATA chunks (inline placeholder on the wire).
/// - ConsumerDeserialize: the client returns raw byte[] to the consumer (IsRawBytesData flow — e.g. DataSource PopulateMerge).
/// - HasType: the concrete AQN of the data arg follows (for typed deserialization).
///
///
/// With this header the client no longer needs to inspect SignalParams in order to decide
/// how to treat the data arg — so it works even when SignalParams is itself streamed
/// (which happens when args[^1] is byte[] or null).
///
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
{
///
/// Parameterless constructor — creates the protocol with all-default options. See base class.
///
public AyCodeBinaryHubProtocol() : base() { }
///
/// Primary constructor — accepts a fully-configured .
///
public AyCodeBinaryHubProtocol(AcBinaryHubProtocolOptions options) : base(options) { }
#region Wire header (per-message)
[Flags]
private enum DataFlags : byte
{
None = 0,
HasData = 1 << 0, // the data arg is not null
Streamed = 1 << 1, // the data arg is delivered via CHUNK_DATA chunks
ConsumerDeserialize = 1 << 2, // client returns raw byte[] to the consumer (IsRawBytesData flow)
HasType = 1 << 3, // a type AQN string follows (for typed deserialization)
}
///
/// Opaque context produced by and threaded through Parse* /
/// ReadArguments / ReadSingleArgument as a parameter (or persisted on the per-binder
/// AsyncChunkState for the chunked path). Consumed by
/// and . Stack-only
/// in flight — no shared instance state, race-mentes on a shared protocol instance.
///
private sealed class HeaderContext
{
public DataFlags Flags { get; }
public Type? Type { get; }
public HeaderContext(DataFlags flags, Type? type) { Flags = flags; Type = type; }
}
///
/// Writes the per-message header. See for the semantics of each bit.
///
protected override void WriteHeader(ref BufferWriterBinaryOutput bw, HubMessage message, object? streamedArg)
{
var dataArg = GetDataArg(message);
var flags = DataFlags.None;
string? typeName = null;
if (dataArg != null)
{
flags |= DataFlags.HasData;
// Streamed: chunked mode active AND streamedArg is the data arg
if (streamedArg != null && ReferenceEquals(streamedArg, dataArg))
flags |= DataFlags.Streamed;
// ConsumerDeserialize: the client requested byte[] via IsRawBytesData
// and the data arg is (or was pre-serialized to) byte[].
// Detected via the sibling SignalParams arg (project convention).
if (dataArg is byte[] && GetClientIsRawBytesData(message))
flags |= DataFlags.ConsumerDeserialize;
// HasType: for typed args we write the concrete AQN so the client can deserialize
// directly (either inline or after streamed chunks are gathered).
// byte[] needs no type info — either base byte[] fast-path or ConsumerDeserialize handles it.
if (dataArg is not byte[])
{
flags |= DataFlags.HasType;
typeName = dataArg.GetType().AssemblyQualifiedName;
}
}
bw.WriteByte((byte)flags);
if ((flags & DataFlags.HasType) != 0)
WriteNullableString(ref bw, typeName);
}
///
/// Reads the per-message header and returns a .
///
protected override object? ReadHeader(ref SequenceReader r)
{
r.TryRead(out byte flagsByte);
var flags = (DataFlags)flagsByte;
Type? resolvedType = null;
if ((flags & DataFlags.HasType) != 0)
{
var typeName = ReadNullableString(ref r);
if (typeName != null)
resolvedType = Type.GetType(typeName);
}
return new HeaderContext(flags, resolvedType);
}
///
/// The data arg by project convention — the last argument for Invocation messages,
/// for stream items, for completions.
///
private static object? GetDataArg(HubMessage message) => message switch
{
InvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
StreamInvocationMessage m when m.Arguments.Length > 0 => m.Arguments[m.Arguments.Length - 1],
StreamItemMessage m => m.Item,
CompletionMessage m => m.HasResult ? m.Result : null,
_ => null
};
///
/// Extracts from the message, assuming the project
/// convention of OnReceiveMessage(int, int?, SignalParams, object) — arg[2] is SignalParams.
/// Returns false for messages that don't follow this shape.
///
private static bool GetClientIsRawBytesData(HubMessage message)
{
if (message is InvocationMessage im && im.Arguments.Length >= 3
&& im.Arguments[2] is SignalParams sp)
return sp.IsRawBytesData;
return false;
}
#endregion
///
/// For the chunked streaming path: resolve the type the background Task will deserialize into.
/// Prefers the concrete type from the wire header (set by ) when present,
/// otherwise falls back to the binder-provided type (base behavior).
///
protected override Type ResolveStreamedArgType(Type binderType, object? headerContext)
{
if (headerContext is HeaderContext hctx && hctx.Type != null)
return hctx.Type;
return base.ResolveStreamedArgType(binderType, headerContext);
}
///
/// Read a single argument, using the per-message header to decide how to treat object-typed args.
/// Decision order:
///
/// - Base byte[] fast-path — tag 0x44 present (file/image/raw bytes from a typed byte[] param).
/// - Header — return raw bytes (consumer handles deserialization later).
/// - Header — resolve target type from the header and deserialize.
/// - Fall through to base typed deserialization against the binder-provided target type.
///
///
protected override object? ReadSingleArgument(ref SequenceReader r, Type targetType, object? headerContext)
{
r.TryReadLittleEndian(out int argLength);
if (argLength == 0)
return null;
// AsyncSegment: streamed arg marker (INT32 -1) → placeholder for chunked deserialization
if (argLength == -1)
return StreamedArgPlaceholder;
if (argLength == 1)
{
r.TryPeek(out byte marker);
if (marker == 0) { r.Advance(1); return null; }
}
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
// 1. Base byte[] fast-path: [0x44 tag][raw bytes] — strip tag, return byte[]
var argReader = new SequenceReader(argSlice);
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
return SequenceToByteArray(argSlice.Slice(1));
var hctx = headerContext as HeaderContext;
// 2. Header ConsumerDeserialize: no tag on wire (isAcBinary path on server),
// consumer wants raw byte[] — return as-is without deserialization.
// Applies only to the data arg (convention: targetType == typeof(object));
// typed args (Int32, SignalParams, etc.) are unaffected.
if (targetType == typeof(object)
&& hctx != null && (hctx.Flags & DataFlags.ConsumerDeserialize) != 0)
return SequenceToByteArray(argSlice);
// 3. Type resolution: prefer concrete type from header over binder type (which is often typeof(object))
if (targetType == typeof(object) && hctx?.Type != null)
targetType = hctx.Type;
// 4. Deserialize — unified ArrayBinaryInput path via GetArgBytes.
// Single-segment: zero-copy on the pipe's slab. Multi-segment: ArrayPool-rented copy.
// _protocolMode no longer affects receive — it is only a send-side strategy.
var (arr, offset, length, rented) = GetArgBytes(argSlice);
try
{
return AcBinaryDeserializer.Deserialize(arr, offset, length, targetType, Options);
}
finally
{
if (rented) ArrayPool.Shared.Return(arr);
}
}
///
/// Application-level CHUNK_ABORT routing for the AyCode correlation
/// pattern. The base SignalR InvocationId is null on server-to-client SendAsync
/// (fire-and-forget at the SignalR layer), but the application encodes
/// (messageTag, requestId, SignalParams, data) into arg[0..3] of an
/// OnReceiveMessage callback. Synthesise a
/// response so the client's _responseByRequestId routing faults the awaiting Task with
/// a specific error instead of waiting for a transport-level timeout.
///
protected override HubMessage? OnChunkAbort(HubMessage partialMessage, object? headerContext, string? invocationId)
{
// Recognise the AyCode OnReceiveMessage(messageTag, requestId, SignalParams, data) shape —
// arg[1] is the application's correlation key, arg[2] is SignalParams.
if (partialMessage is InvocationMessage inv
&& inv.Arguments.Length >= 4
&& inv.Arguments[2] is SignalParams origParams)
{
var errorParams = new SignalParams
{
Status = SignalResponseStatus.Error,
IsRawBytesData = origParams.IsRawBytesData,
DataSerializerType = origParams.DataSerializerType
};
// Same target ("OnReceiveMessage") and args[0..1], SignalParams replaced with Error
// status, data arg (last) cleared — the abort means no data was actually produced.
// The client's OnReceiveMessage routes via _responseByRequestId[requestId] (arg[1])
// and the SignalResponseStatus.Error surfaces to the awaiting caller.
var args = new object?[inv.Arguments.Length];
Array.Copy(inv.Arguments, args, inv.Arguments.Length);
args[2] = errorParams;
args[inv.Arguments.Length - 1] = null;
_logger?.LogDebug("OnChunkAbort synthesised SignalResponseStatus.Error response messageTag={MessageTag} requestId={RequestId}",
inv.Arguments[0], inv.Arguments[1]);
return new InvocationMessage(inv.Target, args);
}
// Unknown shape — defer to base (SignalR InvocationId routing or null/Ping fallback).
return base.OnChunkAbort(partialMessage, headerContext, invocationId);
}
}