Refactor SignalR binary protocol for extensibility
- Move SignalParams-aware deserialization logic from AcBinaryHubProtocol to new AyCodeBinaryHubProtocol, enabling project-specific customization. - Make key deserialization and helper methods in AcBinaryHubProtocol protected and virtual for easier extension. - Improve byte[] handling to distinguish between AcBinary-serialized and raw data. - Remove diagnostic serialization verification from the base protocol. - Update DI registration to use AyCodeBinaryHubProtocol with configurable options. - Adjust client code to support object-based response data and raw byte handling. - Comment out SignalResponseDataMessage diagnostic logger in Program.cs.
This commit is contained in:
parent
3e00876c0f
commit
f825552ae2
|
|
@ -117,6 +117,7 @@ public class TestableSignalRHub2 : AcWebSignalRHubBase<TestSignalRTags, TestLogg
|
|||
IsRawBytesData = isRawBytes
|
||||
};
|
||||
|
||||
//TODO: átrakni a protocl-ra és itt csak beadjuk a protocol-nak! a reader esetbében is olvasáskor!
|
||||
// Protocol round-trip: serialize → multi-segment split → deserialize
|
||||
var invocation = new InvocationMessage(
|
||||
nameof(IAcSignalRHubClient.OnReceiveMessage),
|
||||
|
|
|
|||
|
|
@ -48,13 +48,6 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
protected volatile AcBinarySerializerOptions _options;
|
||||
|
||||
/// <summary>
|
||||
/// Parsed SignalParams from current message (arg[2]).
|
||||
/// Used by ReadSingleArgument (arg[3]) for type-aware deserialization.
|
||||
/// Thread-safe: SignalR processes messages sequentially per connection.
|
||||
/// </summary>
|
||||
private SignalParams? _currentSignalParams;
|
||||
|
||||
public AcBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
|
||||
|
||||
public AcBinaryHubProtocol(AcBinarySerializerOptions options)
|
||||
|
|
@ -220,7 +213,6 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
if (reader.Remaining < payloadLength)
|
||||
return false;
|
||||
|
||||
_currentSignalParams = null;
|
||||
message = ParseMessage(ref reader, payloadLength, binder);
|
||||
|
||||
input = input.Slice(LengthPrefixSize + payloadLength);
|
||||
|
|
@ -395,14 +387,27 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
|
||||
{
|
||||
// byte[] fast-path: size known upfront, write entirely through BWO
|
||||
if (value is byte[] byteArray)
|
||||
{
|
||||
var isAcBinary = byteArray.Length >= 2
|
||||
&& byteArray[0] == AcBinarySerializerOptions.FormatVersion
|
||||
&& (byteArray[1] & 0xF0) == BinaryTypeCode.HeaderFlagsBase;
|
||||
|
||||
// byte[] fast-path: size known upfront, write entirely through BWO
|
||||
if (isAcBinary)
|
||||
{
|
||||
// Already AcBinary-serialized: write raw length + bytes, no tag wrapper
|
||||
bw.WriteRaw(byteArray.Length);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Raw byte[] (image, file, etc.): wrap with ByteArray tag + VarUInt length
|
||||
var argPayload = 1 + VarUIntSize((uint)byteArray.Length) + byteArray.Length;
|
||||
bw.WriteRaw(argPayload);
|
||||
bw.WriteByte(BinaryTypeCode.ByteArray);
|
||||
bw.WriteVarUInt((uint)byteArray.Length);
|
||||
}
|
||||
|
||||
bw.WriteBytes(byteArray);
|
||||
return;
|
||||
}
|
||||
|
|
@ -415,7 +420,6 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
output.Advance(LengthPrefixSize);
|
||||
|
||||
var argBytes = AcBinarySerializer.Serialize(value, output, _options);
|
||||
VerifyBwoAgainstArray(value, _options, argBytes);
|
||||
|
||||
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
|
||||
externalBytes += LengthPrefixSize + argBytes;
|
||||
|
|
@ -436,21 +440,20 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}");
|
||||
|
||||
args[i] = ReadSingleArgument(ref r, targetType);
|
||||
|
||||
// Capture parsed SignalParams for type-aware deserialization of subsequent args
|
||||
if (args[i] is SignalParams sp)
|
||||
_currentSignalParams = sp;
|
||||
OnArgumentRead(args[i], i);
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
protected virtual void OnArgumentRead(object? value, int index) { }
|
||||
|
||||
/// <summary>
|
||||
/// Reads a length-prefixed argument and deserializes it from the pipe's backing buffer.
|
||||
/// Zero-copy: SequenceReader slices the pipe's own memory, TryGetArray gives the backing byte[].
|
||||
/// SignalDataType enables eager deserialization of response data to the server's actual type.
|
||||
/// </summary>
|
||||
private object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
|
||||
protected virtual object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
|
||||
{
|
||||
r.TryReadLittleEndian(out int argLength);
|
||||
if (argLength == 0)
|
||||
|
|
@ -479,23 +482,6 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
return SequenceToByteArray(argReader.UnreadSequence.Slice(0, payloadLength));
|
||||
}
|
||||
|
||||
// IsRawBytesData: server serialized the object normally (no byte[] fast-path on write).
|
||||
// argSlice IS the serialized data. Return it as raw byte[] — no deserialization.
|
||||
// Consumer (e.g. DataSource.PopulateMerge) deserializes it.
|
||||
if (_currentSignalParams is { IsRawBytesData: true })
|
||||
return SequenceToByteArray(argSlice);
|
||||
|
||||
if (targetType == typeof(object) && _currentSignalParams != null)
|
||||
{
|
||||
// Typed response: resolve actual type from SignalDataType for eager deserialization
|
||||
if (_currentSignalParams.SignalDataType != null)
|
||||
{
|
||||
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
|
||||
if (dataType != null)
|
||||
targetType = dataType;
|
||||
}
|
||||
}
|
||||
|
||||
return DeserializeFromSequence(argSlice, targetType, _options);
|
||||
}
|
||||
|
||||
|
|
@ -504,7 +490,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
/// Zero-copy when single-segment (TryGetArray), copies only for rare multi-segment.
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
|
||||
protected static byte[] SequenceToByteArray(ReadOnlySequence<byte> data)
|
||||
{
|
||||
if (data.IsSingleSegment && MemoryMarshal.TryGetArray(data.First, out var seg)
|
||||
&& seg.Offset == 0 && seg.Count == seg.Array!.Length)
|
||||
|
|
@ -518,7 +504,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
/// Single-segment: zero-copy via ArrayBinaryInput. Multi-segment: SequenceBinaryInput (no copy).
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options)
|
||||
protected static object? DeserializeFromSequence(ReadOnlySequence<byte> data, Type targetType, AcBinarySerializerOptions options)
|
||||
=> AcBinaryDeserializer.Deserialize(data, targetType, options);
|
||||
|
||||
#endregion
|
||||
|
|
@ -586,7 +572,7 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static uint ReadVarUInt(ref SequenceReader<byte> r)
|
||||
protected static uint ReadVarUInt(ref SequenceReader<byte> r)
|
||||
{
|
||||
uint value = 0;
|
||||
var shift = 0;
|
||||
|
|
@ -652,72 +638,6 @@ public class AcBinaryHubProtocol : IHubProtocol
|
|||
|
||||
#endregion
|
||||
|
||||
#region Diagnostics
|
||||
|
||||
[Conditional("DEBUG")]
|
||||
private static void VerifyBwoAgainstArray(object? value, AcBinarySerializerOptions options, int bwoBytes)
|
||||
{
|
||||
if (DiagnosticLogger == null) return;
|
||||
|
||||
// Reference serialization via byte[] path
|
||||
byte[] referenceBytes;
|
||||
try
|
||||
{
|
||||
referenceBytes = AcBinarySerializer.Serialize(value, options);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
DiagnosticLogger($"[VERIFY_WRITE] Array serialize FAILED for {value?.GetType().Name}: {ex.Message}");
|
||||
return;
|
||||
}
|
||||
|
||||
// BWO serialization to a temp ArrayBufferWriter to get comparable bytes
|
||||
var tempWriter = new ArrayBufferWriter<byte>(Math.Max(Math.Max(referenceBytes.Length, bwoBytes) + 256, 4096));
|
||||
var tempBytes = AcBinarySerializer.Serialize(value, tempWriter, options);
|
||||
|
||||
if (bwoBytes != referenceBytes.Length)
|
||||
{
|
||||
DiagnosticLogger($"[VERIFY_WRITE_MISMATCH] SIZE: Array={referenceBytes.Length}, BWO_pipe={bwoBytes}, BWO_temp={tempBytes} for {value?.GetType().Name}");
|
||||
}
|
||||
|
||||
// Compare temp BWO output against Array reference byte-by-byte
|
||||
var bwoSpan = tempWriter.WrittenSpan;
|
||||
var refSpan = referenceBytes.AsSpan();
|
||||
var minLen = Math.Min(bwoSpan.Length, refSpan.Length);
|
||||
var mismatchCount = 0;
|
||||
for (int i = 0; i < minLen; i++)
|
||||
{
|
||||
if (bwoSpan[i] != refSpan[i])
|
||||
{
|
||||
if (mismatchCount < 5) // log first 5 mismatches
|
||||
{
|
||||
var start = Math.Max(0, i - 8);
|
||||
var end = Math.Min(minLen, i + 16);
|
||||
var refHex = Convert.ToHexString(refSpan.Slice(start, end - start));
|
||||
var bwoHex = Convert.ToHexString(bwoSpan.Slice(start, end - start));
|
||||
DiagnosticLogger($"[VERIFY_WRITE_MISMATCH] CONTENT #{mismatchCount} at byte {i}/{minLen}: ref={refHex} bwo={bwoHex} type={value?.GetType().Name}");
|
||||
}
|
||||
mismatchCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (mismatchCount > 0)
|
||||
{
|
||||
DiagnosticLogger($"[VERIFY_WRITE_MISMATCH] Total {mismatchCount} differing bytes out of {minLen} compared (Array={referenceBytes.Length}, BWO_temp={tempBytes})");
|
||||
return;
|
||||
}
|
||||
|
||||
if (bwoSpan.Length != refSpan.Length)
|
||||
{
|
||||
DiagnosticLogger($"[VERIFY_WRITE_MISMATCH] Bytes match up to {minLen} but lengths differ: Array={referenceBytes.Length}, BWO_temp={tempBytes}");
|
||||
return;
|
||||
}
|
||||
|
||||
DiagnosticLogger($"[VERIFY_WRITE_OK] {referenceBytes.Length} bytes match for {value?.GetType().Name}");
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helpers
|
||||
|
||||
private static InvocationMessage ApplyInvocationId(InvocationMessage msg, string? invocationId)
|
||||
|
|
|
|||
|
|
@ -70,7 +70,17 @@ namespace AyCode.Services.SignalRs
|
|||
|
||||
if (useAcBinaryProtocol)
|
||||
{
|
||||
hubBuilder.Services.AddSingleton<IHubProtocol, AyCodeBinaryHubProtocol>();
|
||||
hubBuilder.Services.AddSingleton<IHubProtocol>(sp =>
|
||||
{
|
||||
var binaryOptions = AcBinarySerializerOptions.Default;
|
||||
binaryOptions.BufferWriterChunkSize = 4096;
|
||||
|
||||
return new AyCodeBinaryHubProtocol(binaryOptions);
|
||||
});
|
||||
|
||||
//Vagy ha az options-t is DI-ből:
|
||||
//hubBuilder.Services.AddSingleton<IHubProtocol>(sp => new AyCodeBinaryHubProtocol(sp.GetRequiredService<AcBinarySerializerOptions>()));
|
||||
|
||||
AcBinaryHubProtocol.DiagnosticLogger = msg => Logger.Debug(msg);
|
||||
AcBinaryDeserializer.DiagnosticLogger = msg => Logger.Debug(msg);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +1,67 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using AyCode.Core.Serializers.Binaries;
|
||||
|
||||
namespace AyCode.Services.SignalRs;
|
||||
|
||||
/// <summary>
|
||||
/// Project-specific binary protocol.
|
||||
/// Project-specific binary protocol with SignalParams-aware argument deserialization.
|
||||
/// Register this in PluginNopStartup.cs and AcSignalRClientBase instead of AcBinaryHubProtocol.
|
||||
/// </summary>
|
||||
public class AyCodeBinaryHubProtocol : AcBinaryHubProtocol
|
||||
{
|
||||
public AyCodeBinaryHubProtocol() { }
|
||||
/// <summary>
|
||||
/// Parsed SignalParams from current message (arg[2]).
|
||||
/// Used by ReadSingleArgument (arg[3]) for type-aware deserialization.
|
||||
/// Thread-safe: SignalR processes messages sequentially per connection.
|
||||
/// </summary>
|
||||
private SignalParams? _currentSignalParams;
|
||||
|
||||
public AyCodeBinaryHubProtocol() : this(AcBinarySerializerOptions.Default) { }
|
||||
public AyCodeBinaryHubProtocol(AcBinarySerializerOptions options) : base(options) { }
|
||||
|
||||
protected override void OnArgumentRead(object? value, int index)
|
||||
{
|
||||
if (value is SignalParams sp)
|
||||
_currentSignalParams = sp;
|
||||
}
|
||||
|
||||
protected override object? ReadSingleArgument(ref SequenceReader<byte> r, Type targetType)
|
||||
{
|
||||
r.TryReadLittleEndian(out int argLength);
|
||||
if (argLength == 0)
|
||||
return null;
|
||||
|
||||
if (argLength == 1)
|
||||
{
|
||||
r.TryPeek(out byte marker);
|
||||
if (marker == 0) { r.Advance(1); return null; }
|
||||
}
|
||||
|
||||
var argSlice = r.UnreadSequence.Slice(0, argLength);
|
||||
r.Advance(argLength);
|
||||
|
||||
// byte[] fast-path
|
||||
var argReader = new SequenceReader<byte>(argSlice);
|
||||
if (argReader.TryPeek(out byte tag) && tag == BinaryTypeCode.ByteArray)
|
||||
{
|
||||
argReader.Advance(1);
|
||||
var payloadLength = (int)ReadVarUInt(ref argReader);
|
||||
return SequenceToByteArray(argReader.UnreadSequence.Slice(0, payloadLength));
|
||||
}
|
||||
|
||||
// IsRawBytesData: return raw bytes, consumer deserializes
|
||||
if (_currentSignalParams is { IsRawBytesData: true })
|
||||
return SequenceToByteArray(argSlice);
|
||||
|
||||
// SignalDataType: resolve actual type for eager deserialization
|
||||
if (targetType == typeof(object) && _currentSignalParams?.SignalDataType != null)
|
||||
{
|
||||
var dataType = Type.GetType(_currentSignalParams.SignalDataType);
|
||||
if (dataType != null)
|
||||
targetType = dataType;
|
||||
}
|
||||
|
||||
return DeserializeFromSequence(argSlice, targetType, Options);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue