Refactor SignalR: separate metadata and payload transport
Major protocol update: OnReceiveMessage now takes metadata (SignalReceiveParams) and payload (byte[]) as separate hub arguments, not a single envelope. Metadata is AcBinary-serialized; payload uses protocol fast-path. Updated all client/server code, interfaces, and docs. Added ISignalParams and SignalReceiveParams types. Improved AcBinaryHubProtocol diagnostics and made byte[] fast-path more robust. This enables clearer, more debuggable, and future-proof SignalR binary messaging.
This commit is contained in:
parent
f06bd5004d
commit
32018e906a
|
|
@ -46,7 +46,11 @@
|
||||||
"Bash(curl -sL \"https://raw.githubusercontent.com/Cysharp/MemoryPack/main/src/MemoryPack.Core/MemoryPackCode.cs\")",
|
"Bash(curl -sL \"https://raw.githubusercontent.com/Cysharp/MemoryPack/main/src/MemoryPack.Core/MemoryPackCode.cs\")",
|
||||||
"Bash(curl -sL \"https://raw.githubusercontent.com/Cysharp/MemoryPack/main/src/MemoryPack.Generator/MemoryPackGenerator.Parser.cs\")",
|
"Bash(curl -sL \"https://raw.githubusercontent.com/Cysharp/MemoryPack/main/src/MemoryPack.Generator/MemoryPackGenerator.Parser.cs\")",
|
||||||
"Bash(perl -i -pe 's/GetWrapperBySlot\\\\\\(\\([^,]+\\), \\(typeof\\\\\\([^\\)]+\\\\\\)\\)\\\\\\)/GetWrapper\\($2, $1\\)/g' \"H:/Applications/Aycode/Source/AyCode.Core/AyCode.Core.Serializers.SourceGenerator/AcBinarySourceGenerator.cs\")",
|
"Bash(perl -i -pe 's/GetWrapperBySlot\\\\\\(\\([^,]+\\), \\(typeof\\\\\\([^\\)]+\\\\\\)\\)\\\\\\)/GetWrapper\\($2, $1\\)/g' \"H:/Applications/Aycode/Source/AyCode.Core/AyCode.Core.Serializers.SourceGenerator/AcBinarySourceGenerator.cs\")",
|
||||||
"Bash(wc -l H:/Applications/Aycode/Source/AyCode.Core/AyCode.Core/Serializers/Binaries/*.cs)"
|
"Bash(wc -l H:/Applications/Aycode/Source/AyCode.Core/AyCode.Core/Serializers/Binaries/*.cs)",
|
||||||
|
"Read(//h/Applications/Mango/Source/NopCommerce.Common/4.70/Plugins/Nop.Plugin.Misc.AIPlugin/**)",
|
||||||
|
"Bash(2)",
|
||||||
|
"Bash(dotnet --version)",
|
||||||
|
"WebSearch"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -212,16 +212,16 @@ public class BenchmarkSignalRClient : AcSignalRClientBase, IAcSignalRHubItemServ
|
||||||
public TResponse? GetAllSync<TResponse>(int tag)
|
public TResponse? GetAllSync<TResponse>(int tag)
|
||||||
=> GetAllAsync<TResponse>(tag).GetAwaiter().GetResult();
|
=> GetAllAsync<TResponse>(tag).GetAwaiter().GetResult();
|
||||||
|
|
||||||
protected override Task MessageReceived(int messageTag, byte[] messageBytes) => Task.CompletedTask;
|
protected override Task MessageReceived(int messageTag, SignalReceiveParams receiveParams, byte[] data) => Task.CompletedTask;
|
||||||
protected override HubConnectionState GetConnectionState() => HubConnectionState.Connected;
|
protected override HubConnectionState GetConnectionState() => HubConnectionState.Connected;
|
||||||
protected override bool IsConnected() => true;
|
protected override bool IsConnected() => true;
|
||||||
protected override Task StartConnectionInternal() => Task.CompletedTask;
|
protected override Task StartConnectionInternal() => Task.CompletedTask;
|
||||||
protected override Task StopConnectionInternal() => Task.CompletedTask;
|
protected override Task StopConnectionInternal() => Task.CompletedTask;
|
||||||
protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask;
|
protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask;
|
||||||
|
|
||||||
protected override async Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId)
|
protected override async Task SendToHubAsync(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[]? messageBytes)
|
||||||
{
|
{
|
||||||
await _hub.OnReceiveMessage(messageTag, messageBytes, requestId);
|
await _hub.OnReceiveMessage(messageTag, requestId, receiveParams, messageBytes ?? []);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ
|
||||||
|
|
||||||
#region Override virtual methods for testing
|
#region Override virtual methods for testing
|
||||||
|
|
||||||
protected override async Task MessageReceived(int messageTag, byte[] messageBytes)
|
protected override async Task MessageReceived(int messageTag, SignalReceiveParams receiveParams, byte[] data)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
@ -52,9 +52,9 @@ public class TestableSignalRClient2 : AcSignalRClientBase, IAcSignalRHubItemServ
|
||||||
|
|
||||||
protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask;
|
protected override ValueTask DisposeConnectionInternal() => ValueTask.CompletedTask;
|
||||||
|
|
||||||
protected override async Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId)
|
protected override async Task SendToHubAsync(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[]? messageBytes)
|
||||||
{
|
{
|
||||||
await _signalRHub.OnReceiveMessage(messageTag, messageBytes, requestId);
|
await _signalRHub.OnReceiveMessage(messageTag, requestId, receiveParams, messageBytes ?? []);
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,11 @@ public abstract class AcSignalRSendToClientService<TSignalRHub, TSignalRTags, TL
|
||||||
|
|
||||||
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, object? content)
|
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, object? content)
|
||||||
{
|
{
|
||||||
var response = new SignalResponseDataMessage(messageTag, SignalResponseStatus.Success, content, AcBinarySerializerOptions.Default);
|
var responseData = SignalRSerializationHelper.CreateResponseData(content, AcBinarySerializerOptions.Default) ?? [];
|
||||||
var responseBytes = response.ToBinary();
|
var receiveParams = new SignalReceiveParams { Status = SignalResponseStatus.Success };
|
||||||
|
|
||||||
Logger.Info($"[{responseBytes.Length / 1024}kb] Server sending to client; {ConstHelper.NameByValue<TSignalRTags>(messageTag)}");
|
Logger.Info($"[{responseData.Length / 1024}kb] Server sending to client; {ConstHelper.NameByValue<TSignalRTags>(messageTag)}");
|
||||||
await sendTo.OnReceiveMessage(messageTag, responseBytes, null);
|
await sendTo.OnReceiveMessage(messageTag, null, receiveParams, responseData);
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual Task SendMessageToAllClients(int messageTag, object? content)
|
public virtual Task SendMessageToAllClients(int messageTag, object? content)
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,10 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
|
||||||
|
|
||||||
public override async Task OnConnectedAsync()
|
public override async Task OnConnectedAsync()
|
||||||
{
|
{
|
||||||
|
// Enable protocol diagnostics to debug deserialization issues
|
||||||
|
if (EnableBinaryDiagnostics)
|
||||||
|
AcBinaryHubProtocol.DiagnosticLogger ??= msg => Logger.Info(msg);
|
||||||
|
|
||||||
Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}");
|
Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}");
|
||||||
LogContextUserNameAndId();
|
LogContextUserNameAndId();
|
||||||
await base.OnConnectedAsync();
|
await base.OnConnectedAsync();
|
||||||
|
|
@ -60,9 +64,9 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
|
||||||
|
|
||||||
#region Message Processing
|
#region Message Processing
|
||||||
|
|
||||||
public virtual Task OnReceiveMessage(int messageTag, byte[]? messageBytes, int? requestId)
|
public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data)
|
||||||
{
|
{
|
||||||
return ProcessOnReceiveMessage(messageTag, messageBytes, requestId, null);
|
return ProcessOnReceiveMessage(messageTag, data, requestId, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes)
|
public virtual IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes)
|
||||||
|
|
@ -542,13 +546,15 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, ISignalRMessage message, int? requestId = null)
|
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, ISignalRMessage message, int? requestId = null)
|
||||||
{
|
{
|
||||||
var responseBytes = SignalRSerializationHelper.SerializeToBinary(message);
|
var responseMessage = (SignalResponseDataMessage)message;
|
||||||
|
var receiveParams = new SignalReceiveParams { Status = responseMessage.Status };
|
||||||
|
var responseData = responseMessage.ResponseData ?? [];
|
||||||
|
|
||||||
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
|
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
|
||||||
|
|
||||||
Logger.Debug($"[{responseBytes.Length / 1024}kb] Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}");
|
Logger.Debug($"[{responseData.Length / 1024}kb] Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}");
|
||||||
|
|
||||||
await sendTo.OnReceiveMessage(messageTag, responseBytes, requestId);
|
await sendTo.OnReceiveMessage(messageTag, requestId, receiveParams, responseData);
|
||||||
|
|
||||||
Logger.Debug($"Server sent message to client; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}");
|
Logger.Debug($"Server sent message to client; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,15 +8,18 @@ Server-side SignalR hub infrastructure: method dispatch, session management, bro
|
||||||
## Server Processing
|
## Server Processing
|
||||||
|
|
||||||
```
|
```
|
||||||
6. OnReceiveMessage(tag, bytes, requestId)
|
6. OnReceiveMessage(tag, requestId, receiveParams, data)
|
||||||
7. DynamicMethodRegistry.GetMethodByMessageTag(tag) ← ConcurrentDictionary lookup
|
7. DynamicMethodRegistry.GetMethodByMessageTag(tag) ← ConcurrentDictionary lookup
|
||||||
8. DeserializeParameters(bytes):
|
8. DeserializeParameters(data):
|
||||||
├─ DeserializeFromBinary<SignalPostJsonMessage>() ← unwrap Binary envelope
|
├─ DeserializeFromBinary<SignalPostJsonMessage>() ← unwrap Binary envelope
|
||||||
├─ IdMessage format? → parse each Ids[i] as JSON per parameter type
|
├─ IdMessage format? → parse each Ids[i] as JSON per parameter type
|
||||||
└─ Complex object? → json.JsonTo(paramType) ⚠️ tech debt: JSON parse
|
└─ Complex object? → json.JsonTo(paramType) ⚠️ tech debt: JSON parse
|
||||||
9. MethodInfo.InvokeMethod(instance, params) ← unwraps Task/ValueTask
|
9. MethodInfo.InvokeMethod(instance, params) ← unwraps Task/ValueTask
|
||||||
10. CreateResponseMessage(tag, Success, result) ← pure Binary serialization
|
10. CreateResponseMessage(tag, Success, result) ← Binary serialize payload → byte[]
|
||||||
11. ResponseToCaller(tag, message, requestId)
|
11. SendMessageToClient(caller, tag, message, requestId):
|
||||||
|
├─ Extract receiveParams { Status } + responseData byte[] from message
|
||||||
|
└─ caller.OnReceiveMessage(tag, requestId, receiveParams, responseData)
|
||||||
|
(metadata + payload as separate args — no envelope serialization)
|
||||||
12. If SendToOtherClientType != None:
|
12. If SendToOtherClientType != None:
|
||||||
└─ SendMessageToOthers(sendToOtherClientTag, result) ← uses sendToOtherClientTag, not messageTag
|
└─ SendMessageToOthers(sendToOtherClientTag, result) ← uses sendToOtherClientTag, not messageTag
|
||||||
```
|
```
|
||||||
|
|
@ -28,7 +31,7 @@ See also: `AyCode.Models.Server/DynamicMethods/README.md`
|
||||||
### Server-Side Lookup
|
### Server-Side Lookup
|
||||||
|
|
||||||
```
|
```
|
||||||
1. OnReceiveMessage(tag=100, bytes, requestId)
|
1. OnReceiveMessage(tag=100, requestId, receiveParams, data)
|
||||||
|
|
||||||
2. DynamicMethodRegistry.GetMethodByMessageTag(100)
|
2. DynamicMethodRegistry.GetMethodByMessageTag(100)
|
||||||
├─ Check static ConcurrentDictionary<int, (Type, AcMethodInfoModel)?> cache
|
├─ Check static ConcurrentDictionary<int, (Type, AcMethodInfoModel)?> cache
|
||||||
|
|
@ -77,7 +80,7 @@ ConcurrentDictionary<TSessionItemId, TSessionItem> Sessions
|
||||||
| `SendMessageToUser(userId)` | User (all connections) |
|
| `SendMessageToUser(userId)` | User (all connections) |
|
||||||
| `SendMessageToUsers(userIds)` | Multiple users |
|
| `SendMessageToUsers(userIds)` | Multiple users |
|
||||||
|
|
||||||
All messages wrapped in `SignalResponseDataMessage` → binary serialized → `OnReceiveMessage`.
|
All messages serialized to `byte[]` payload + `SignalReceiveParams` metadata → sent as separate hub arguments via `OnReceiveMessage` (no envelope wrapping).
|
||||||
|
|
||||||
## Hub Events
|
## Hub Events
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
using System.Buffers;
|
using System.Buffers;
|
||||||
|
using System.Diagnostics;
|
||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
|
@ -273,11 +274,32 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Diagnostic logger for protocol-level debugging.
|
||||||
|
/// Set to non-null to log target method, arg count, param types during ParseInvocation.
|
||||||
|
/// </summary>
|
||||||
|
public static Action<string>? DiagnosticLogger { get; set; }
|
||||||
|
|
||||||
|
[Conditional("DEBUG")]
|
||||||
|
private static void LogDiagnostic(string message) => DiagnosticLogger?.Invoke(message);
|
||||||
|
|
||||||
|
[Conditional("DEBUG")]
|
||||||
|
private static void LogParseInvocation(string target, IReadOnlyList<Type> paramTypes, int remaining)
|
||||||
|
{
|
||||||
|
if (DiagnosticLogger == null) return;
|
||||||
|
var typeNames = new string[paramTypes.Count];
|
||||||
|
for (var i = 0; i < paramTypes.Count; i++) typeNames[i] = paramTypes[i].Name;
|
||||||
|
DiagnosticLogger($"[AcBinaryHubProtocol] ParseInvocation target='{target}'; paramTypes.Count={paramTypes.Count}; types=[{string.Join(", ", typeNames)}]; remaining={remaining}");
|
||||||
|
}
|
||||||
|
|
||||||
private HubMessage ParseInvocation(ref SpanReader r, IInvocationBinder binder)
|
private HubMessage ParseInvocation(ref SpanReader r, IInvocationBinder binder)
|
||||||
{
|
{
|
||||||
var invocationId = r.ReadNullableString();
|
var invocationId = r.ReadNullableString();
|
||||||
var target = r.ReadString();
|
var target = r.ReadString();
|
||||||
var paramTypes = binder.GetParameterTypes(target);
|
var paramTypes = binder.GetParameterTypes(target);
|
||||||
|
|
||||||
|
LogParseInvocation(target, paramTypes, r.Remaining);
|
||||||
|
|
||||||
var args = ReadArguments(ref r, paramTypes);
|
var args = ReadArguments(ref r, paramTypes);
|
||||||
var streamIds = r.ReadStringArray();
|
var streamIds = r.ReadStringArray();
|
||||||
var headers = ReadHeaders(ref r);
|
var headers = ReadHeaders(ref r);
|
||||||
|
|
@ -410,11 +432,17 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
|
||||||
private object?[] ReadArguments(ref SpanReader r, IReadOnlyList<Type> paramTypes)
|
private object?[] ReadArguments(ref SpanReader r, IReadOnlyList<Type> paramTypes)
|
||||||
{
|
{
|
||||||
var count = (int)r.ReadVarUInt();
|
var count = (int)r.ReadVarUInt();
|
||||||
|
|
||||||
|
LogDiagnostic($"[AcBinaryHubProtocol] ReadArguments count={count}; remaining={r.Remaining}");
|
||||||
|
|
||||||
var args = new object?[count];
|
var args = new object?[count];
|
||||||
|
|
||||||
for (var i = 0; i < count; i++)
|
for (var i = 0; i < count; i++)
|
||||||
{
|
{
|
||||||
var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object);
|
var targetType = i < paramTypes.Count ? paramTypes[i] : typeof(object);
|
||||||
|
|
||||||
|
LogDiagnostic($"[AcBinaryHubProtocol] arg[{i}] targetType={targetType.Name}; remaining={r.Remaining}");
|
||||||
|
|
||||||
args[i] = ReadSingleArgument(ref r, targetType);
|
args[i] = ReadSingleArgument(ref r, targetType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -432,8 +460,12 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
|
||||||
if (argLength == 1 && argSpan[0] == 0)
|
if (argLength == 1 && argSpan[0] == 0)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
// byte[] fast-path: bypass deserializer engine
|
// byte[] fast-path: bypass deserializer engine.
|
||||||
if (targetType == typeof(byte[]) && argSpan.Length > 0 && argSpan[0] == BinaryTypeCode.ByteArray)
|
// Check wire format only — ByteArray marker (0x44) is unambiguous:
|
||||||
|
// no AcBinary-serialized object starts with it (they start with version=1).
|
||||||
|
// Removing the targetType check makes the protocol robust against
|
||||||
|
// client/server argument order mismatches for byte[] arguments.
|
||||||
|
if (argSpan.Length > 0 && argSpan[0] == BinaryTypeCode.ByteArray)
|
||||||
{
|
{
|
||||||
var byteReader = new SpanReader(argSpan.Slice(1));
|
var byteReader = new SpanReader(argSpan.Slice(1));
|
||||||
var len = (int)byteReader.ReadVarUInt();
|
var len = (int)byteReader.ReadVarUInt();
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ namespace AyCode.Services.SignalRs
|
||||||
protected readonly HubConnection? HubConnection;
|
protected readonly HubConnection? HubConnection;
|
||||||
protected readonly AcLoggerBase Logger;
|
protected readonly AcLoggerBase Logger;
|
||||||
|
|
||||||
protected abstract Task MessageReceived(int messageTag, byte[] messageBytes);
|
protected abstract Task MessageReceived(int messageTag, SignalReceiveParams receiveParams, byte[] data);
|
||||||
|
|
||||||
public int MsDelay = 25;
|
public int MsDelay = 25;
|
||||||
public int MsFirstDelay = 50;
|
public int MsFirstDelay = 50;
|
||||||
|
|
@ -70,7 +70,7 @@ namespace AyCode.Services.SignalRs
|
||||||
HubConnection = hubBuilder.Build();
|
HubConnection = hubBuilder.Build();
|
||||||
|
|
||||||
HubConnection.Closed += HubConnection_Closed;
|
HubConnection.Closed += HubConnection_Closed;
|
||||||
_ = HubConnection.On<int, byte[], int?>(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage);
|
_ = HubConnection.On<int, int?, SignalReceiveParams, byte[]>(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AcSignalRClientBase(AcLoggerBase logger)
|
protected AcSignalRClientBase(AcLoggerBase logger)
|
||||||
|
|
@ -105,8 +105,8 @@ namespace AyCode.Services.SignalRs
|
||||||
protected virtual ValueTask DisposeConnectionInternal()
|
protected virtual ValueTask DisposeConnectionInternal()
|
||||||
=> HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask;
|
=> HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask;
|
||||||
|
|
||||||
protected virtual Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId)
|
protected virtual Task SendToHubAsync(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[]? messageBytes)
|
||||||
=> HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, messageBytes, requestId) ?? Task.CompletedTask;
|
=> HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, requestId, receiveParams, messageBytes) ?? Task.CompletedTask;
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
@ -150,7 +150,8 @@ namespace AyCode.Services.SignalRs
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await SendToHubAsync(messageTag, msgBytes, requestId);
|
var receiveParams = new SignalReceiveParams { Status = SignalResponseStatus.Success };
|
||||||
|
await SendToHubAsync(messageTag, requestId, receiveParams, msgBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
#region CRUD
|
#region CRUD
|
||||||
|
|
@ -419,11 +420,11 @@ namespace AyCode.Services.SignalRs
|
||||||
|
|
||||||
protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32;
|
protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32;
|
||||||
|
|
||||||
public virtual Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId)
|
public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data)
|
||||||
{
|
{
|
||||||
var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}";
|
var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}";
|
||||||
|
|
||||||
if (messageBytes.Length == 0) Logger.Warning($"message.Length == 0! {logText}");
|
if (data.Length == 0) Logger.Warning($"data.Length == 0! {logText}");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
@ -431,12 +432,18 @@ namespace AyCode.Services.SignalRs
|
||||||
{
|
{
|
||||||
var reqId = requestId.Value;
|
var reqId = requestId.Value;
|
||||||
requestModel.ResponseDateTime = DateTime.UtcNow;
|
requestModel.ResponseDateTime = DateTime.UtcNow;
|
||||||
Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{messageBytes.Length / 1024}kb]{logText}");
|
Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{data.Length / 1024}kb]{logText}");
|
||||||
|
|
||||||
// Diagnostic logging for binary deserialization debugging
|
// Diagnostic logging for binary deserialization debugging
|
||||||
LogBinaryDiagnostics(messageTag, messageBytes, requestId);
|
LogBinaryDiagnostics(messageTag, data, requestId);
|
||||||
|
|
||||||
var responseMessage = SignalRSerializationHelper.DeserializeFromBinary<SignalResponseDataMessage>(messageBytes) ?? new SignalResponseDataMessage();
|
// No envelope deserialization — construct directly from params + data
|
||||||
|
var responseMessage = new SignalResponseDataMessage
|
||||||
|
{
|
||||||
|
Status = receiveParams.Status,
|
||||||
|
DataSerializerType = AcSerializerType.Binary,
|
||||||
|
ResponseData = data
|
||||||
|
};
|
||||||
|
|
||||||
switch (requestModel.ResponseByRequestId)
|
switch (requestModel.ResponseByRequestId)
|
||||||
{
|
{
|
||||||
|
|
@ -467,14 +474,14 @@ namespace AyCode.Services.SignalRs
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger.Info(logText);
|
Logger.Info(logText);
|
||||||
MessageReceived(messageTag, messageBytes).Forget();
|
MessageReceived(messageTag, receiveParams, data).Forget();
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Enhanced error logging with binary diagnostics
|
// Enhanced error logging with binary diagnostics
|
||||||
if (messageBytes.Length > 0)
|
if (data.Length > 0)
|
||||||
{
|
{
|
||||||
LogBinaryDiagnosticsOnError(messageTag, messageBytes, requestId, ex);
|
LogBinaryDiagnosticsOnError(messageTag, data, requestId, ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel))
|
if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel))
|
||||||
|
|
|
||||||
|
|
@ -3,5 +3,5 @@
|
||||||
public interface IAcSignalRHubBase
|
public interface IAcSignalRHubBase
|
||||||
{
|
{
|
||||||
//Task OnRequestMessage(int messageTag, int requestId);
|
//Task OnRequestMessage(int messageTag, int requestId);
|
||||||
Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId);
|
Task OnReceiveMessage(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data);
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
using AyCode.Core.Serializers.Attributes;
|
||||||
|
|
||||||
|
namespace AyCode.Services.SignalRs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Base interface for SignalR message parameters (metadata).
|
||||||
|
/// </summary>
|
||||||
|
public interface ISignalParams { }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parameters received alongside message data.
|
||||||
|
/// Travels as a separate SignalR hub argument (small, AcBinary serialized)
|
||||||
|
/// while the payload byte[] uses the protocol's zero-copy fast-path.
|
||||||
|
/// </summary>
|
||||||
|
[AcBinarySerializable]
|
||||||
|
public class SignalReceiveParams : ISignalParams
|
||||||
|
{
|
||||||
|
public SignalResponseStatus Status { get; set; }
|
||||||
|
}
|
||||||
|
|
@ -13,7 +13,8 @@ Custom binary SignalR protocol, client infrastructure, message tagging, and seri
|
||||||
### Client
|
### Client
|
||||||
- **`AcSignalRClientBase.cs`** — Abstract SignalR client managing `HubConnection`, request/response tracking via pooled `SignalRRequestModel`. Methods: `SendMessageToServerAsync<TResponse>()`, CRUD helpers (Post, Get, GetAll, GetAllInto). Configurable timeouts.
|
- **`AcSignalRClientBase.cs`** — Abstract SignalR client managing `HubConnection`, request/response tracking via pooled `SignalRRequestModel`. Methods: `SendMessageToServerAsync<TResponse>()`, CRUD helpers (Post, Get, GetAll, GetAllInto). Configurable timeouts.
|
||||||
- **`IAcSignalRHubClient.cs`** — Client interface + `SignalResponseDataMessage` (sealed, supports JSON/Binary with GZip, caching, diagnostics).
|
- **`IAcSignalRHubClient.cs`** — Client interface + `SignalResponseDataMessage` (sealed, supports JSON/Binary with GZip, caching, diagnostics).
|
||||||
- **`IAcSignalRHubBase.cs`** — Base hub interface: `OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId)`.
|
- **`IAcSignalRHubBase.cs`** — Base hub interface: `OnReceiveMessage(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data)`.
|
||||||
|
- **`ISignalParams.cs`** — `ISignalParams` base interface + `SignalReceiveParams` (Status). Metadata travels as separate hub argument (AcBinary serialized), payload `byte[]` uses protocol fast-path (zero-copy).
|
||||||
|
|
||||||
### Message Tagging
|
### Message Tagging
|
||||||
- **`SignalMessageTagAttribute.cs`** — Three attributes: `TagAttribute` (base, int messageTag), `SignalRAttribute` (server method routing + client notification), `SignalRSendToClientAttribute` (client-side receive).
|
- **`SignalMessageTagAttribute.cs`** — Three attributes: `TagAttribute` (base, int messageTag), `SignalRAttribute` (server method routing + client notification), `SignalRSendToClientAttribute` (client-side receive).
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,12 @@ Client-side SignalR transport: custom binary protocol, tag-based dispatch. Sourc
|
||||||
Single hub method, tag-based dispatch:
|
Single hub method, tag-based dispatch:
|
||||||
|
|
||||||
```
|
```
|
||||||
Client ──OnReceiveMessage(tag, bytes, requestId)──► Server
|
Client ──OnReceiveMessage(tag, requestId, receiveParams, data)──► Server
|
||||||
Client ◄──OnReceiveMessage(tag, bytes, requestId)── Server
|
Client ◄──OnReceiveMessage(tag, requestId, receiveParams, data)── Server
|
||||||
```
|
```
|
||||||
|
|
||||||
Tag (int) determines server method. All calls go through `OnReceiveMessage`.
|
Tag (int) determines server method. All calls go through `OnReceiveMessage`.
|
||||||
|
Metadata (`SignalReceiveParams`) and payload (`byte[]`) travel as **separate hub arguments** — the `byte[]` uses the protocol's zero-copy fast-path, metadata is AcBinary serialized normally.
|
||||||
|
|
||||||
```
|
```
|
||||||
Client: Server:
|
Client: Server:
|
||||||
|
|
@ -72,16 +73,17 @@ Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy via `BufferWriter
|
||||||
|
|
||||||
> Wire format, argument framing, dual BWO pattern, length prefix patching: `SIGNALR_BINARY_PROTOCOL.md`
|
> Wire format, argument framing, dual BWO pattern, length prefix patching: `SIGNALR_BINARY_PROTOCOL.md`
|
||||||
|
|
||||||
### Response Message
|
### Metadata + Payload Separation
|
||||||
|
|
||||||
`SignalResponseDataMessage`:
|
`SignalReceiveParams` (separate hub argument, AcBinary serialized):
|
||||||
|
|
||||||
| Field | Type | Purpose |
|
| Field | Type | Purpose |
|
||||||
|-------|------|---------|
|
|-------|------|---------|
|
||||||
| `MessageTag` | int | Operation tag |
|
|
||||||
| `Status` | SignalResponseStatus | Success/Error |
|
| `Status` | SignalResponseStatus | Success/Error |
|
||||||
| `ResponseData` | byte[] | Serialized payload |
|
|
||||||
| `DataSerializerType` | AcSerializerType | Binary or Json |
|
`byte[] data` (separate hub argument, protocol fast-path, zero-copy).
|
||||||
|
|
||||||
|
`SignalResponseDataMessage` remains as **internal DTO** for callback routing — constructed in-memory from `receiveParams` + `data`, never serialized as envelope on wire.
|
||||||
|
|
||||||
Binary (default): `AcBinarySerializer.ToBinary(data)`. JSON fallback: `ToJson` → `GzipHelper.Compress`.
|
Binary (default): `AcBinarySerializer.ToBinary(data)`. JSON fallback: `ToJson` → `GzipHelper.Compress`.
|
||||||
|
|
||||||
|
|
@ -95,20 +97,22 @@ Binary (default): `AcBinarySerializer.ToBinary(data)`. JSON fallback: `ToJson`
|
||||||
├─ Primitives/strings/enums/value types → IdMessage
|
├─ Primitives/strings/enums/value types → IdMessage
|
||||||
└─ Complex → SignalPostJsonDataMessage<T> ⚠️ JSON-in-Binary tech debt
|
└─ Complex → SignalPostJsonDataMessage<T> ⚠️ JSON-in-Binary tech debt
|
||||||
3. SerializeToBinary(message)
|
3. SerializeToBinary(message)
|
||||||
4. HubConnection.SendAsync("OnReceiveMessage", tag, bytes, requestId)
|
4. SignalReceiveParams { Status = Success }
|
||||||
5. AcBinaryHubProtocol frames on wire
|
5. HubConnection.SendAsync("OnReceiveMessage", tag, requestId, receiveParams, bytes)
|
||||||
|
6. AcBinaryHubProtocol frames on wire (byte[] via fast-path, receiveParams via AcBinary)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Server → Client
|
### Server → Client
|
||||||
|
|
||||||
```
|
```
|
||||||
OnReceiveMessage(tag, bytes, requestId)
|
OnReceiveMessage(tag, requestId, receiveParams, data)
|
||||||
|
├─ Construct SignalResponseDataMessage in-memory (no envelope deser):
|
||||||
|
│ └─ { Status = receiveParams.Status, DataSerializerType = Binary, ResponseData = data }
|
||||||
├─ Matching requestId in pending dict:
|
├─ Matching requestId in pending dict:
|
||||||
│ ├─ DeserializeFromBinary<SignalResponseDataMessage>(bytes)
|
|
||||||
│ ├─ Route: null→sync wait, Action→invoke, Func<Task>→await
|
│ ├─ Route: null→sync wait, Action→invoke, Func<Task>→await
|
||||||
│ └─ GetResponseData<T>(): Binary→BinaryTo<T>(), JSON→Decompress→Deserialize
|
│ └─ GetResponseData<T>(): Binary→BinaryTo<T>(), JSON→Decompress→Deserialize
|
||||||
└─ No match (broadcast):
|
└─ No match (broadcast):
|
||||||
└─ abstract MessageReceived(tag, bytes).Forget()
|
└─ abstract MessageReceived(tag, receiveParams, data).Forget()
|
||||||
```
|
```
|
||||||
|
|
||||||
Request pooling: `SignalRRequestModel` via `SignalRRequestModelPool` (ObjectPool + IResettable).
|
Request pooling: `SignalRRequestModel` via `SignalRRequestModelPool` (ObjectPool + IResettable).
|
||||||
|
|
@ -147,4 +151,5 @@ Request pooling: `SignalRRequestModel` via `SignalRRequestModelPool` (ObjectPool
|
||||||
| CRUD tags | `SignalRs/SignalRCrudTags.cs` |
|
| CRUD tags | `SignalRs/SignalRCrudTags.cs` |
|
||||||
| SendToClientType | `SignalRs/SendToClientType.cs` |
|
| SendToClientType | `SignalRs/SendToClientType.cs` |
|
||||||
| Message types | `SignalRs/IAcSignalRHubClient.cs` |
|
| Message types | `SignalRs/IAcSignalRHubClient.cs` |
|
||||||
|
| Params interface | `SignalRs/ISignalParams.cs` |
|
||||||
| Serialization | `SignalRs/SignalRSerializationHelper.cs` |
|
| Serialization | `SignalRs/SignalRSerializationHelper.cs` |
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ When argument is `byte[]`, bypasses serializer:
|
||||||
2. INT32 prefix written with actual value (no patching)
|
2. INT32 prefix written with actual value (no patching)
|
||||||
3. `BinaryTypeCode.ByteArray(68)` + VarUInt length + raw bytes via BWO
|
3. `BinaryTypeCode.ByteArray(68)` + VarUInt length + raw bytes via BWO
|
||||||
|
|
||||||
Read side mirrors: if `targetType == typeof(byte[])` and first byte is `ByteArray`, deserializer bypassed → direct `SpanReader`.
|
Read side mirrors: if first byte is `ByteArray(0x44)`, deserializer bypassed → direct `SpanReader`. Detection is **wire-format only** (no targetType check) — ByteArray marker is unambiguous since no AcBinary object starts with 0x44 (version=1).
|
||||||
|
|
||||||
## Read Path
|
## Read Path
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -70,17 +70,18 @@ For full architecture see `AyCode.Services/docs/SIGNALR.md`.
|
||||||
|
|
||||||
| Term | Definition |
|
| Term | Definition |
|
||||||
|---|---|
|
|---|---|
|
||||||
| **OnReceiveMessage** | The single SignalR method for all communication. Signature: `(int messageTag, byte[] messageBytes, int? requestId)`. |
|
| **OnReceiveMessage** | The single SignalR method for all communication. Signature: `(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data)`. Metadata and payload are separate hub arguments — `byte[]` uses protocol zero-copy fast-path. |
|
||||||
|
| **SignalReceiveParams** | Lightweight metadata sent alongside message payload as separate hub argument. Contains `Status` (SignalResponseStatus). Implements `ISignalParams`. AcBinary serialized. |
|
||||||
| **Message Tag** | Integer identifier mapping to a method via `[SignalR(tag)]` or `[SignalRSendToClient(tag)]` attributes. |
|
| **Message Tag** | Integer identifier mapping to a method via `[SignalR(tag)]` or `[SignalRSendToClient(tag)]` attributes. |
|
||||||
| **DynamicMethodRegistry** | Resolves message tags to `MethodInfo` at runtime. Static `ConcurrentDictionary` cache with lazy scan on miss. |
|
| **DynamicMethodRegistry** | Resolves message tags to `MethodInfo` at runtime. Static `ConcurrentDictionary` cache with lazy scan on miss. |
|
||||||
| **SignalRCrudTags** | Sealed class bundling 5 independent tag integers (getAllTag, getItemTag, addTag, updateTag, removeTag) for entity CRUD. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. |
|
| **SignalRCrudTags** | Sealed class bundling 5 independent tag integers (getAllTag, getItemTag, addTag, updateTag, removeTag) for entity CRUD. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. |
|
||||||
| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. Uses `BufferWriterBinaryOutput` for zero-copy writes to the SignalR pipe. |
|
| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. Uses `BufferWriterBinaryOutput` for zero-copy writes to the SignalR pipe. |
|
||||||
| **SignalResponseDataMessage** | Response message supporting Binary or JSON+GZip. Responses use pure Binary (no JSON overhead). |
|
| **SignalResponseDataMessage** | Internal DTO for callback routing (not serialized on wire). Constructed in-memory from `receiveParams` + `data`. Supports Binary or JSON+GZip via `GetResponseData<T>()`. |
|
||||||
| **SignalPostJsonDataMessage** | ⚠️ TECH DEBT — request params serialized to JSON inside Binary envelope. Planned for pure Binary replacement. |
|
| **SignalPostJsonDataMessage** | ⚠️ TECH DEBT — request params serialized to JSON inside Binary envelope. Planned for pure Binary replacement. |
|
||||||
| **AcSignalRDataSource** | Generic real-time `IList<T>` with change tracking, CRUD via SignalRCrudTags, binary merge, rollback, sync state. |
|
| **AcSignalRDataSource** | Generic real-time `IList<T>` with change tracking, CRUD via SignalRCrudTags, binary merge, rollback, sync state. |
|
||||||
| **TrackingItem** | Wraps a modified DataSource item with `TrackingState` (Add/Update/Remove) + `OriginalValue` for rollback. |
|
| **TrackingItem** | Wraps a modified DataSource item with `TrackingState` (Add/Update/Remove) + `OriginalValue` for rollback. |
|
||||||
| **SendToClientType** | Enum controlling broadcast scope: None, Others, Caller, All. |
|
| **SendToClientType** | Enum controlling broadcast scope: None, Others, Caller, All. |
|
||||||
| **AcWebSignalRHubBase** | Abstract server hub. Receives `OnReceiveMessage`, dispatches via DynamicMethodRegistry, broadcasts to other clients. |
|
| **AcWebSignalRHubBase** | Abstract server hub. Receives `OnReceiveMessage`, dispatches via DynamicMethodRegistry, responds/broadcasts via `SendMessageToClient` (metadata + payload as separate args, no envelope). |
|
||||||
| **AcSignalRClientBase** | Abstract client. Manages `HubConnection`, request/response correlation via `requestId`, pooled `SignalRRequestModel`. |
|
| **AcSignalRClientBase** | Abstract client. Manages `HubConnection`, request/response correlation via `requestId`, pooled `SignalRRequestModel`. |
|
||||||
| **AcSessionService** | `ConcurrentDictionary`-based session tracker for connected SignalR clients. |
|
| **AcSessionService** | `ConcurrentDictionary`-based session tracker for connected SignalR clients. |
|
||||||
| **AcSignalRSendToClientService** | Server-push service: `SendMessageToAllClients`, `SendMessageToConnection`, `SendMessageToUser`. |
|
| **AcSignalRSendToClientService** | Server-push service: `SendMessageToAllClients`, `SendMessageToConnection`, `SendMessageToUser`. |
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue