AyCode.Core/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs

670 lines
23 KiB
C#

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace AyCode.Services.SignalRs;
/// <summary>
/// Custom SignalR hub protocol using AcBinarySerializer for wire format.
/// Eliminates JSON+Base64 overhead by serializing all HubMessages directly to binary.
///
/// Wire format per message:
/// [4 bytes: payload length (little-endian)] [payload bytes]
///
/// Payload structure:
/// [1 byte: message type] [message-specific fields serialized via AcBinary]
///
/// Message types map 1:1 to SignalR HubMessageType values.
/// Arguments are serialized individually with an INT32 length prefix each,
/// enabling deferred deserialization via IHubProtocol's binder pattern.
///
/// Write path: BufferWriterBinaryOutput for zero virtual dispatch on the hot path.
/// Argument payloads serialized directly to the pipe via AcBinarySerializer (zero-copy write).
///
/// Read path: SequenceReader&lt;byte&gt; reads directly from the pipe's ReadOnlySequence.
/// Argument deserialization uses the pipe's backing byte[] via TryGetArray (zero-copy read).
/// </summary>
public class AcBinaryHubProtocol : IHubProtocol
{
private const int LengthPrefixSize = 4;
// Message type markers (matching HubMessageType enum values)
private const byte MsgInvocation = 1;
private const byte MsgStreamItem = 2;
private const byte MsgCompletion = 3;
private const byte MsgStreamInvocation = 4;
private const byte MsgCancelInvocation = 5;
private const byte MsgPing = 6;
private const byte MsgClose = 7;
private const byte MsgAck = 8;
private const byte MsgSequence = 9;
protected volatile AcBinarySerializerOptions _options;
/// <summary>
/// Parsed SignalParams from current message (arg[2]).
/// Used by ReadSingleArgument (arg[3]) for type-aware deserialization.
/// Thread-safe: SignalR processes messages sequentially per connection.
/// </summary>
private SignalParams? _currentSignalParams;
public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
public AcBinaryHubProtocol(AcBinarySerializerOptions options)
{
_options = options;
_options.BufferWriterChunkSize = 4096;
}
/// <summary>
/// Runtime-replaceable serializer options.
/// Thread-safe: uses volatile field, callers see the new options on next message.
/// </summary>
public AcBinarySerializerOptions Options
{
get => _options;
set => _options = value;
}
public string Name => "acbinary";
public int Version => 1;
public TransferFormat TransferFormat => TransferFormat.Binary;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsVersionSupported(int version) => version <= Version;
#region WriteMessage
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
{
// +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory,
// which would invalidate the length prefix span obtained before Advance.
var writer = new ArrayBufferWriter<byte>(_options.BufferWriterChunkSize + LengthPrefixSize);
WriteMessage(message, writer);
return writer.WrittenMemory;
}
public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{
// Reserve outer length prefix directly on the pipe (before BWO takes over)
var lengthSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
int externalBytes = 0;
switch (message)
{
case InvocationMessage m:
WriteInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamInvocationMessage m:
WriteStreamInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamItemMessage m:
WriteStreamItem(ref bw, output, m, ref externalBytes);
break;
case CompletionMessage m:
WriteCompletion(ref bw, output, m, ref externalBytes);
break;
case CancelInvocationMessage m:
WriteCancelInvocation(ref bw, m);
break;
case PingMessage:
bw.WriteByte(MsgPing);
break;
case CloseMessage m:
WriteClose(ref bw, m);
break;
case AckMessage m:
bw.WriteByte(MsgAck);
bw.WriteRaw(m.SequenceId);
break;
case SequenceMessage m:
bw.WriteByte(MsgSequence);
bw.WriteRaw(m.SequenceId);
break;
default:
throw new HubException($"Unexpected message type: {message.GetType().Name}");
}
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
}
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgInvocation);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamInvocationMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamInvocation);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamItemMessage m, ref int externalBytes)
{
bw.WriteByte(MsgStreamItem);
bw.WriteStringUtf8(m.InvocationId!);
WriteArgument(ref bw, output, m.Item, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, CompletionMessage m, ref int externalBytes)
{
bw.WriteByte(MsgCompletion);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
var hasResult = m.HasResult;
bw.WriteByte(hasResult ? (byte)1 : (byte)0);
if (hasResult)
WriteArgument(ref bw, output, m.Result, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m)
{
bw.WriteByte(MsgCancelInvocation);
bw.WriteStringUtf8(m.InvocationId!);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m)
{
bw.WriteByte(MsgClose);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0);
}
#endregion
#region TryParseMessage
public virtual bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message)
{
message = null;
var reader = new SequenceReader<byte>(input);
if (!reader.TryReadLittleEndian(out int payloadLength))
return false;
if (reader.Remaining < payloadLength)
return false;
_currentSignalParams = null;
message = ParseMessage(ref reader, payloadLength, binder);
input = input.Slice(LengthPrefixSize + payloadLength);
return message != null;
}
private HubMessage? ParseMessage(ref SequenceReader<byte> r, int payloadLength, IInvocationBinder binder)
{
if (payloadLength == 0)
return null;
// Mark end position so Parse* methods can check Remaining relative to payload
var payloadEnd = r.Consumed + payloadLength;
r.TryRead(out byte msgType);
return msgType switch
{
MsgInvocation => ParseInvocation(ref r, binder),
MsgStreamInvocation => ParseStreamInvocation(ref r, binder),
MsgStreamItem => ParseStreamItem(ref r, binder),
MsgCompletion => ParseCompletion(ref r, binder),
MsgCancelInvocation => ParseCancelInvocation(ref r),
MsgPing => PingMessage.Instance,
MsgClose => ParseClose(ref r),
MsgAck => new AckMessage(ReadInt64(ref r)),
MsgSequence => new SequenceMessage(ReadInt64(ref r)),
_ => null
};
}
/// <summary>
/// Diagnostic logger for protocol-level debugging.
/// Set to non-null to log target method, arg count, param types during ParseInvocation.
/// </summary>
public static Action<string>? DiagnosticLogger { get; set; }
[Conditional("DEBUG")]
private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message);
[Conditional("DEBUG")]
private static void LogReadSingleArgument(ReadOnlySequence<byte> argSlice, int argLength, Type targetType)
{
if (DiagnosticLogger == null) return;
var segmentCount = 0;
foreach (var _ in argSlice)
segmentCount++;
DiagnosticLogger($"[AcBinaryHubProtocol] ReadSingleArgument: argLength={argLength}, isSingleSegment={argSlice.IsSingleSegment}, segments={segmentCount}, type={targetType.Name}");
}
[Conditional("DEBUG")]
private static void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, long remaining)
{
if (DiagnosticLogger == null) return;
var typeNames = new string[paramTypes.Count];
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}");
}
private HubMessage ParseInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
var invocationId = ReadNullableString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
LogParseInvocation(target, paramTypes, r.Remaining);
var args = ReadArguments(ref r, paramTypes);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = streamIds is { Length: > 0 }
? new InvocationMessage(invocationId, target, args, streamIds)
: ApplyInvocationId(new InvocationMessage(target, args), invocationId);
if (headers != null)
SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamInvocation(ref SequenceReader<byte> r, IInvocationBinder binder)
{
var invocationId = ReadString(ref r);
var target = ReadString(ref r);
var paramTypes = binder.GetParameterTypes(target);
var args = ReadArguments(ref r, paramTypes);
var streamIds = ReadStringArray(ref r);
var headers = ReadHeaders(ref r);
var msg = new StreamInvocationMessage(invocationId, target, args, streamIds);
if (headers != null)
SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseStreamItem(ref SequenceReader<byte> r, IInvocationBinder binder)
{
var invocationId = ReadString(ref r);
var itemType = binder.GetStreamItemType(invocationId);
var item = ReadSingleArgument(ref r, itemType);
var headers = ReadHeaders(ref r);
var msg = new StreamItemMessage(invocationId, item);
if (headers != null)
SetHeaders(msg, headers);
return msg;
}
private HubMessage ParseCompletion(ref SequenceReader<byte> r, IInvocationBinder binder)
{
var invocationId = ReadString(ref r);
var error = ReadNullableString(ref r);
r.TryRead(out byte hasResultByte);
var hasResult = hasResultByte == 1;
object? result = null;
if (hasResult)
{
var resultType = binder.GetReturnType(invocationId);
result = ReadSingleArgument(ref r, resultType);
}
var headers = ReadHeaders(ref r);
CompletionMessage msg;
if (error != null)
msg = CompletionMessage.WithError(invocationId, error);
else if (hasResult)
msg = CompletionMessage.WithResult(invocationId, result);
else
msg = CompletionMessage.Empty(invocationId);
if (headers != null)
SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseCancelInvocation(ref SequenceReader<byte> r)
{
var invocationId = ReadString(ref r);
var headers = ReadHeaders(ref r);
var msg = new CancelInvocationMessage(invocationId);
if (headers != null)
SetHeaders(msg, headers);
return msg;
}
private static HubMessage ParseClose(ref SequenceReader<byte> r)
{
var error = ReadNullableString(ref r);
r.TryRead(out byte reconnectByte);
var allowReconnect = reconnectByte == 1;
return new CloseMessage(error, allowReconnect);
}
#endregion
#region Argument Serialization
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
{
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
{
if (value is byte[] byteArray)
{
// byte[] fast-path: size known upfront, write entirely through BWO
var argPayload = 1 + VarUIntSize((uint)byteArray.Length) + byteArray.Length;
bw.WriteRaw(argPayload);
bw.WriteByte(BinaryTypeCode.ByteArray);
bw.WriteVarUInt((uint)byteArray.Length);
bw.WriteBytes(byteArray);
return;
}
// Flush BWO to pipe, then serialize directly to the pipe via AcBinarySerializer
bw.FlushAndReset();
// Reserve arg length prefix directly on the pipe
var argLenSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var argBytes = AcBinarySerializer.Serialize(value, output, _options);
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
externalBytes += LengthPrefixSize + argBytes;
}
private object?[] ReadArguments(ref SequenceReader<byte> r, IReadOnlyList<Type> paramTypes)
{
var count = (int)ReadVarUInt(ref r);
LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}");
var args = new object?[count];
for (var i = 0; i < count; i++)
{
var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object);
LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}");
args[i] = ReadSingleArgument(ref r, targetType);
// Capture parsed SignalParams for type-aware deserialization of subsequent args
if (args[i] is SignalParams sp)
_currentSignalParams = sp;
}
return args;
}
/// <summary>
/// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer.
/// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[].
/// SignalDataType enables eager deserialization of response data to the server's actual type.
/// </summary>
private object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
{
r.TryReadLittleEndian(out int argLength);
if (argLength == 0)
return null;
// Null marker check
if (argLength == 1)
{
r.TryPeek(out byte marker);
if (marker == 0) { r.Advance(1); return null; }
}
// Slice argument from pipe sequence — zero-copy reference
var argSlice = r.UnreadSequence.Slice(0, argLength);
r.Advance(argLength);
LogReadSingleArgument(argSlice, argLength, targetType);
// byte[] fast-path: first byte is BinaryTypeCode.ByteArray tag →
// strip tag + VarUInt length prefix, return raw payload. No deserializer.
var argReader = new SequenceReader<byte>(argSlice);
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
{
argReader.Advance(1); // skip tag
var payloadLength = (int)ReadVarUInt(ref argReader);
return SequenceToByteArray(argReader.UnreadSequence.Slice(0, payloadLength));
}
// IsRawBytesData: server serialized the object normally (no byte[] fast-path on write).
// argSlice IS the serialized data. Return it as raw byte[] — no deserialization.
// Consumer (e.g. DataSource.PopulateMerge) deserializes it.
if (_currentSignalParams is { IsRawBytesData: true })
return SequenceToByteArray(argSlice);
if (targetType == typeof(object) && _currentSignalParams != null)
{
// Typed response: resolve actual type from SignalDataType for eager deserialization
if (_currentSignalParams.SignalDataType != null)
{
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
if (dataType != null)
targetType = dataType;
}
}
return DeserializeFromSequence(argSlice, targetType, _options);
}
/// <summary>
/// Returns raw byte[] from the pipe sequence without any deserialization.
/// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
{
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)
&& seg.Offset == 0 && seg.Count == seg.Array!.Length)
return seg.Array;
return data.ToArray();
}
/// <summary>
/// Deserializes from a ReadOnlySequence via AcBinaryDeserializer.
/// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options)
=> AcBinaryDeserializer.Deserialize(data, targetType, options);
#endregion
#region Write Framing Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
{
if (value == null)
{
bw.WriteByte(0);
return;
}
bw.WriteByte(1);
bw.WriteStringUtf8(value);
}
private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array)
{
if (array == null || array.Length == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)array.Length);
for (var i = 0; i < array.Length; i++)
bw.WriteStringUtf8(array[i]);
}
private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary<string, string>? headers)
{
if (headers == null || headers.Count == 0)
{
bw.WriteVarUInt(0);
return;
}
bw.WriteVarUInt((uint)headers.Count);
foreach (var kv in headers)
{
bw.WriteStringUtf8(kv.Key);
bw.WriteStringUtf8(kv.Value);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int VarUIntSize(uint value)
{
if (value < 0x80) return 1;
if (value < 0x4000) return 2;
if (value < 0x200000) return 3;
if (value < 0x10000000) return 4;
return 5;
}
#endregion
#region Sequence Read Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long ReadInt64(ref SequenceReader<byte> r)
{
r.TryReadLittleEndian(out long v);
return v;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint ReadVarUInt(ref SequenceReader<byte> r)
{
uint value = 0;
var shift = 0;
while (r.TryRead(out byte b))
{
value |= (uint)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
return value;
shift += 7;
}
return value;
}
private static string ReadString(ref SequenceReader<byte> r)
{
var byteCount = (int)ReadVarUInt(ref r);
if (byteCount == 0)
return string.Empty;
r.TryReadExact(byteCount, out var bytes);
return bytes.IsSingleSegment
? Encoding.UTF8.GetString(bytes.FirstSpan)
: Encoding.UTF8.GetString(bytes.ToArray());
}
private static string? ReadNullableString(ref SequenceReader<byte> r)
{
r.TryRead(out byte marker);
return marker == 0 ? null : ReadString(ref r);
}
private static string[]? ReadStringArray(ref SequenceReader<byte> r)
{
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var array = new string[count];
for (var i = 0; i < count; i++)
array[i] = ReadString(ref r);
return array;
}
private static Dictionary<string, string>? ReadHeaders(ref SequenceReader<byte> r)
{
if (r.Remaining == 0)
return null;
var count = (int)ReadVarUInt(ref r);
if (count == 0)
return null;
var headers = new Dictionary<string, string>(count, StringComparer.Ordinal);
for (var i = 0; i < count; i++)
{
var key = ReadString(ref r);
var value = ReadString(ref r);
headers[key] = value;
}
return headers;
}
#endregion
#region Helpers
private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId)
{
if (invocationId != null)
return new InvocationMessage(invocationId, msg.Target, msg.Arguments);
return msg;
}
private static void SetHeaders(HubMessage msg, Dictionary<string, string> headers)
{
if (msg is HubInvocationMessage invMsg)
invMsg.Headers = headers;
}
#endregion
}