using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text; using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Protocol; namespace AyCode.Services.SignalRs; /// /// Custom SignalR hub protocol using AcBinarySerializer for wire format. /// Eliminates JSON+Base64 overhead by serializing all HubMessages directly to binary. /// /// Wire format per message: /// [4 bytes: payload length (little-endian)] [payload bytes] /// /// Payload structure: /// [1 byte: message type] [message-specific fields serialized via AcBinary] /// /// Message types map 1:1 to SignalR HubMessageType values. /// Arguments are serialized individually with an INT32 length prefix each, /// enabling deferred deserialization via IHubProtocol's binder pattern. /// /// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path. /// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write). /// /// Read path: SequenceReader<byte> reads directly from the pipe's ReadOnlySequence. /// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read). /// public class AcBinaryHubProtocol : IHubProtocol { private const int LengthPrefixSize = 4; // Message type markers (matching HubMessageType enum values) private const byte MsgInvocation = 1; private const byte MsgStreamItem = 2; private const byte MsgCompletion = 3; private const byte MsgStreamInvocation = 4; private const byte MsgCancelInvocation = 5; private const byte MsgPing = 6; private const byte MsgClose = 7; private const byte MsgAck = 8; private const byte MsgSequence = 9; protected volatile AcBinarySerializerOptions _options; protected readonly BinaryProtocolMode _protocolMode; public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { } public AcBinaryHubProtocol(AcBinarySerializerOptions options, BinaryProtocolMode protocolMode = BinaryProtocolMode.Bytes) { _options = options; _options.BufferWriterChunkSize = 4096; _protocolMode = protocolMode; } /// /// Runtime-replaceable serializer options. /// Thread-safe: uses volatile field, callers see the new options on next message. /// public AcBinarySerializerOptions Options { get => _options; set => _options = value; } public string Name => "acbinary"; public int Version => 1; public TransferFormat TransferFormat => TransferFormat.Binary; [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsVersionSupported(int version) => version <= Version; #region WriteMessage public ReadOnlyMemory GetMessageBytes(HubMessage message) { // +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory, // which would invalidate the length prefix span obtained before Advance. var writer = new ArrayBufferWriter(_options.BufferWriterChunkSize + LengthPrefixSize); WriteMessage(message, writer); return writer.WrittenMemory; } public void WriteMessage(HubMessage message, IBufferWriter output) { // Reserve outer length prefix directly on the pipe (before BWO takes over) var lengthSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize); int externalBytes = 0; switch (message) { case InvocationMessage m: WriteInvocation(ref bw, output, m, ref externalBytes); break; case StreamInvocationMessage m: WriteStreamInvocation(ref bw, output, m, ref externalBytes); break; case StreamItemMessage m: WriteStreamItem(ref bw, output, m, ref externalBytes); break; case CompletionMessage m: WriteCompletion(ref bw, output, m, ref externalBytes); break; case CancelInvocationMessage m: WriteCancelInvocation(ref bw, m); break; case PingMessage: bw.WriteByte(MsgPing); break; case CloseMessage m: WriteClose(ref bw, m); break; case AckMessage m: bw.WriteByte(MsgAck); bw.WriteRaw(m.SequenceId); break; case SequenceMessage m: bw.WriteByte(MsgSequence); bw.WriteRaw(m.SequenceId); break; default: throw new HubException($"Unexpected message type: {message.GetType().Name}"); } var totalPayload = bw.Position + externalBytes; bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); } private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgInvocation); WriteNullableString(ref bw, m.InvocationId); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); } private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamInvocationMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamInvocation); bw.WriteStringUtf8(m.InvocationId!); bw.WriteStringUtf8(m.Target); WriteArguments(ref bw, output, m.Arguments, ref externalBytes); WriteStringArray(ref bw, m.StreamIds); WriteHeaders(ref bw, m.Headers); } private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamItemMessage m, ref int externalBytes) { bw.WriteByte(MsgStreamItem); bw.WriteStringUtf8(m.InvocationId!); WriteArgument(ref bw, output, m.Item, ref externalBytes); WriteHeaders(ref bw, m.Headers); } private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter output, CompletionMessage m, ref int externalBytes) { bw.WriteByte(MsgCompletion); bw.WriteStringUtf8(m.InvocationId!); WriteNullableString(ref bw, m.Error); var hasResult = m.HasResult; bw.WriteByte(hasResult ? (byte)1 : (byte)0); if (hasResult) WriteArgument(ref bw, output, m.Result, ref externalBytes); WriteHeaders(ref bw, m.Headers); } private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m) { bw.WriteByte(MsgCancelInvocation); bw.WriteStringUtf8(m.InvocationId!); WriteHeaders(ref bw, m.Headers); } private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m) { bw.WriteByte(MsgClose); WriteNullableString(ref bw, m.Error); bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0); } #endregion #region TryParseMessage public virtual bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) { message = null; var reader = new SequenceReader(input); if (!reader.TryReadLittleEndian(out int payloadLength)) return false; if (reader.Remaining < payloadLength) return false; message = ParseMessage(ref reader, payloadLength, binder); input = input.Slice(LengthPrefixSize + payloadLength); return message != null; } private HubMessage? ParseMessage(ref SequenceReader r, int payloadLength, IInvocationBinder binder) { if (payloadLength == 0) return null; // Mark end position so Parse* methods can check Remaining relative to payload var payloadEnd = r.Consumed + payloadLength; r.TryRead(out byte msgType); return msgType switch { MsgInvocation => ParseInvocation(ref r, binder), MsgStreamInvocation => ParseStreamInvocation(ref r, binder), MsgStreamItem => ParseStreamItem(ref r, binder), MsgCompletion => ParseCompletion(ref r, binder), MsgCancelInvocation => ParseCancelInvocation(ref r), MsgPing => PingMessage.Instance, MsgClose => ParseClose(ref r), MsgAck => new AckMessage(ReadInt64(ref r)), MsgSequence => new SequenceMessage(ReadInt64(ref r)), _ => null }; } /// /// Diagnostic logger for protocol-level debugging. /// Set to non-null to log target method, arg count, param types during ParseInvocation. /// public static Action? DiagnosticLogger { get; set; } [Conditional("DEBUG")] private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message); [Conditional("DEBUG")] private static void LogReadSingleArgument(ReadOnlySequence argSlice, int argLength, Type targetType) { if (DiagnosticLogger == null) return; var segmentCount = 0; foreach (var _ in argSlice) segmentCount++; DiagnosticLogger($"[AcBinaryHubProtocol] ReadSingleArgument: argLength={argLength}, isSingleSegment={argSlice.IsSingleSegment}, segments={segmentCount}, type={targetType.Name}"); } [Conditional("DEBUG")] private static void LogParseInvocation(string target, IReadOnlyList paramTypes, long remaining) { if (DiagnosticLogger == null) return; var typeNames = new string[paramTypes.Count]; for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name; DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}"); } private HubMessage ParseInvocation(ref SequenceReader r, IInvocationBinder binder) { var invocationId = ReadNullableString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); LogParseInvocation(target, paramTypes, r.Remaining); var args = ReadArguments(ref r, paramTypes); var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = streamIds is { Length: > 0 } ? new InvocationMessage(invocationId, target, args, streamIds) : ApplyInvocationId(new InvocationMessage(target, args), invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseStreamInvocation(ref SequenceReader r, IInvocationBinder binder) { var invocationId = ReadString(ref r); var target = ReadString(ref r); var paramTypes = binder.GetParameterTypes(target); var args = ReadArguments(ref r, paramTypes); var streamIds = ReadStringArray(ref r); var headers = ReadHeaders(ref r); var msg = new StreamInvocationMessage(invocationId, target, args, streamIds); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseStreamItem(ref SequenceReader r, IInvocationBinder binder) { var invocationId = ReadString(ref r); var itemType = binder.GetStreamItemType(invocationId); var item = ReadSingleArgument(ref r, itemType); var headers = ReadHeaders(ref r); var msg = new StreamItemMessage(invocationId, item); if (headers != null) SetHeaders(msg, headers); return msg; } private HubMessage ParseCompletion(ref SequenceReader r, IInvocationBinder binder) { var invocationId = ReadString(ref r); var error = ReadNullableString(ref r); r.TryRead(out byte hasResultByte); var hasResult = hasResultByte == 1; object? result = null; if (hasResult) { var resultType = binder.GetReturnType(invocationId); result = ReadSingleArgument(ref r, resultType); } var headers = ReadHeaders(ref r); CompletionMessage msg; if (error != null) msg = CompletionMessage.WithError(invocationId, error); else if (hasResult) msg = CompletionMessage.WithResult(invocationId, result); else msg = CompletionMessage.Empty(invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private static HubMessage ParseCancelInvocation(ref SequenceReader r) { var invocationId = ReadString(ref r); var headers = ReadHeaders(ref r); var msg = new CancelInvocationMessage(invocationId); if (headers != null) SetHeaders(msg, headers); return msg; } private static HubMessage ParseClose(ref SequenceReader r) { var error = ReadNullableString(ref r); r.TryRead(out byte reconnectByte); var allowReconnect = reconnectByte == 1; return new CloseMessage(error, allowReconnect); } #endregion #region Argument Serialization private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, ref int externalBytes) { bw.WriteVarUInt((uint)arguments.Length); for (var i = 0; i < arguments.Length; i++) WriteArgument(ref bw, output, arguments[i], ref externalBytes); } private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter output, object? value, ref int externalBytes) { // byte[] fast-path: size known upfront, write entirely through BWO if (value is byte[] byteArray) { var isAcBinary = byteArray.Length >= 2 && byteArray[0] == AcBinarySerializerOptions.FormatVersion && (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase; if (isAcBinary) { // Already AcBinary-serialized: write raw length + bytes, no tag wrapper bw.WriteRaw(byteArray.Length); } else { // Raw byte[] (image, file, etc.): tag + raw bytes, no VarUInt (argLength implies size) bw.WriteRaw(1 + byteArray.Length); bw.WriteByte(BinaryTypeCode.ByteArray); } bw.WriteBytes(byteArray); return; } // Bytes mode: serialize to byte[], write through BWO (no FlushAndReset needed) if (_protocolMode == BinaryProtocolMode.Bytes) { var serialized = AcBinarySerializer.Serialize(value, _options); bw.WriteRaw(serialized.Length); bw.WriteBytes(serialized); return; } // Segment / AsyncSegment: serialize directly to the pipe bw.FlushAndReset(); // Reserve arg length prefix directly on the pipe var argLenSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); var argBytes = _protocolMode == BinaryProtocolMode.AsyncSegment ? AcBinarySerializer.Serialize(value, (System.IO.Pipelines.PipeWriter)output, _options) : AcBinarySerializer.Serialize(value, output, _options); Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); externalBytes += LengthPrefixSize + argBytes; } private object?[] ReadArguments(ref SequenceReader r, IReadOnlyList paramTypes) { var count = (int)ReadVarUInt(ref r); LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}"); var args = new object?[count]; for (var i = 0; i < count; i++) { var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object); LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}"); args[i] = ReadSingleArgument(ref r, targetType); OnArgumentRead(args[i], i); } return args; } protected virtual void OnArgumentRead(object? value, int index) { } /// /// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer. /// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[]. /// SignalDataType enables eager deserialization of response data to the server's actual type. /// protected virtual object? ReadSingleArgument(ref SequenceReader r, Type targetType) { r.TryReadLittleEndian(out int argLength); if (argLength == 0) return null; // Null marker check if (argLength == 1) { r.TryPeek(out byte marker); if (marker == 0) { r.Advance(1); return null; } } // Slice argument from pipe sequence — zero-copy reference var argSlice = r.UnreadSequence.Slice(0, argLength); r.Advance(argLength); LogReadSingleArgument(argSlice, argLength, targetType); // byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag → // strip tag, rest is raw payload. No VarUInt length (argLength implies size). var argReader = new SequenceReader(argSlice); if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray) { return SequenceToByteArray(argSlice.Slice(1)); } // Bytes mode: linearize to byte[] → ArrayBinaryInput (fastest deser, no segment overhead) if (_protocolMode == BinaryProtocolMode.Bytes) { var bytes = SequenceToByteArray(argSlice); return AcBinaryDeserializer.Deserialize(bytes, targetType, _options); } return DeserializeFromSequence(argSlice, targetType, _options); } /// /// Returns raw byte[] from the pipe sequence without any deserialization. /// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static byte[] SequenceToByteArray(ReadOnlySequence data) { if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg) && seg.Offset == 0 && seg.Count == seg.Array!.Length) return seg.Array; return data.ToArray(); } /// /// Deserializes from a ReadOnlySequence via AcBinaryDeserializer. /// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static object? DeserializeFromSequence(ReadOnlySequence data, Type targetType, AcBinarySerializerOptions options) => AcBinaryDeserializer.Deserialize(data, targetType, options); #endregion #region Write Framing Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) { if (value == null) { bw.WriteByte(0); return; } bw.WriteByte(1); bw.WriteStringUtf8(value); } private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array) { if (array == null || array.Length == 0) { bw.WriteVarUInt(0); return; } bw.WriteVarUInt((uint)array.Length); for (var i = 0; i < array.Length; i++) bw.WriteStringUtf8(array[i]); } private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary? headers) { if (headers == null || headers.Count == 0) { bw.WriteVarUInt(0); return; } bw.WriteVarUInt((uint)headers.Count); foreach (var kv in headers) { bw.WriteStringUtf8(kv.Key); bw.WriteStringUtf8(kv.Value); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int VarUIntSize(uint value) { if (value < 0x80) return 1; if (value < 0x4000) return 2; if (value < 0x200000) return 3; if (value < 0x10000000) return 4; return 5; } #endregion #region Sequence Read Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] private static long ReadInt64(ref SequenceReader r) { r.TryReadLittleEndian(out long v); return v; } [MethodImpl(MethodImplOptions.AggressiveInlining)] protected static uint ReadVarUInt(ref SequenceReader r) { uint value = 0; var shift = 0; while (r.TryRead(out byte b)) { value |= (uint)(b & 0x7F) << shift; if ((b & 0x80) == 0) return value; shift += 7; } return value; } private static string ReadString(ref SequenceReader r) { var byteCount = (int)ReadVarUInt(ref r); if (byteCount == 0) return string.Empty; r.TryReadExact(byteCount, out var bytes); return bytes.IsSingleSegment ? Encoding.UTF8.GetString(bytes.FirstSpan) : Encoding.UTF8.GetString(bytes.ToArray()); } private static string? ReadNullableString(ref SequenceReader r) { r.TryRead(out byte marker); return marker == 0 ? null : ReadString(ref r); } private static string[]? ReadStringArray(ref SequenceReader r) { var count = (int)ReadVarUInt(ref r); if (count == 0) return null; var array = new string[count]; for (var i = 0; i < count; i++) array[i] = ReadString(ref r); return array; } private static Dictionary? ReadHeaders(ref SequenceReader r) { if (r.Remaining == 0) return null; var count = (int)ReadVarUInt(ref r); if (count == 0) return null; var headers = new Dictionary(count, StringComparer.Ordinal); for (var i = 0; i < count; i++) { var key = ReadString(ref r); var value = ReadString(ref r); headers[key] = value; } return headers; } #endregion #region Helpers private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId) { if (invocationId != null) return new InvocationMessage(invocationId, msg.Target, msg.Arguments); return msg; } private static void SetHeaders(HubMessage msg, Dictionary headers) { if (msg is HubInvocationMessage invMsg) invMsg.Headers = headers; } #endregion }