523 lines
22 KiB
C#
523 lines
22 KiB
C#
using System.Buffers;
|
|
using System.Security.Claims;
|
|
using AyCode.Core;
|
|
using AyCode.Core.Extensions;
|
|
using AyCode.Core.Helpers;
|
|
using AyCode.Core.Loggers;
|
|
using AyCode.Core.Serializers;
|
|
using AyCode.Core.Serializers.Binaries;
|
|
using AyCode.Core.Serializers.Jsons;
|
|
using AyCode.Models.Server.DynamicMethods;
|
|
using AyCode.Services.SignalRs;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
namespace AyCode.Services.Server.SignalRs;
|
|
|
|
public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration configuration, TLogger logger)
|
|
: Hub<IAcSignalRHubItemServer>, IAcSignalRHubServer where TSignalRTags : AcSignalRTags where TLogger : AcLoggerBase
|
|
{
|
|
/// <summary>
|
|
/// Registry for dynamic method lookups with lazy initialization.
|
|
/// </summary>
|
|
protected readonly AcDynamicMethodRegistry<SignalRAttribute> DynamicMethodRegistry = new();
|
|
|
|
protected TLogger Logger = logger;
|
|
protected IConfiguration Configuration = configuration;
|
|
|
|
protected AcSerializerOptions SerializerOptions = new AcBinarySerializerOptions();
|
|
|
|
/// <summary>
|
|
/// Enable diagnostic logging for binary serialization debugging.
|
|
/// Set to true to log hex dumps of serialized response data.
|
|
/// </summary>
|
|
public static bool EnableBinaryDiagnostics { get; set; } = false;
|
|
|
|
#region Connection Lifecycle
|
|
|
|
public override async Task OnConnectedAsync()
|
|
{
|
|
// Enable protocol diagnostics to debug deserialization issues
|
|
if (EnableBinaryDiagnostics)
|
|
AcBinaryHubProtocol.DiagnosticLogger ??= msg => Logger.Info(msg);
|
|
|
|
Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}");
|
|
LogContextUserNameAndId();
|
|
await base.OnConnectedAsync();
|
|
}
|
|
|
|
public override async Task OnDisconnectedAsync(Exception? exception)
|
|
{
|
|
var connectionId = GetConnectionId();
|
|
var userIdentifier = GetUserIdentifier();
|
|
|
|
if (exception == null)
|
|
Logger.Debug($"Server OnDisconnectedAsync; ConnectionId: {connectionId}; UserIdentifier: {userIdentifier}");
|
|
else
|
|
Logger.Error($"Server OnDisconnectedAsync; ConnectionId: {connectionId}; UserIdentifier: {userIdentifier}", exception);
|
|
|
|
LogContextUserNameAndId();
|
|
await base.OnDisconnectedAsync(exception);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Message Processing
|
|
|
|
public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalReceiveParams receiveParams, byte[] data)
|
|
{
|
|
return ProcessOnReceiveMessage(messageTag, data, requestId, null);
|
|
}
|
|
|
|
public virtual IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes)
|
|
{
|
|
return ProcessOnStreamMessage(messageTag, messageBytes, Context.ConnectionAborted);
|
|
}
|
|
|
|
protected virtual async IAsyncEnumerable<byte[]> ProcessOnStreamMessage(int messageTag, byte[]? message, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
|
|
|
|
Logger.Debug($"[{message?.Length ?? 0:N0}b] Server OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
|
|
try
|
|
{
|
|
if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId();
|
|
|
|
if (TryFindAndInvokeMethod(messageTag, message, tagName, out var responseData))
|
|
{
|
|
if (responseData == null)
|
|
yield break;
|
|
|
|
var resultType = responseData.GetType();
|
|
var elementType = GetAsyncEnumerableElementType(resultType);
|
|
|
|
if (elementType != null)
|
|
{
|
|
var typedEnumerable = GetTypedStream(elementType, responseData, messageTag, cancellationToken);
|
|
await foreach (var chunk in typedEnumerable.WithCancellation(cancellationToken))
|
|
{
|
|
yield return chunk;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Logger.Warning($"Method '{tagName}' does not return IAsyncEnumerable. Returning normal message as single chunk.");
|
|
var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData);
|
|
yield return SignalRSerializationHelper.SerializeToBinary(responseMessage);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Logger.Warning($"Not found dynamic method for the tag! {tagName}");
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
Logger.Debug($"Server closed OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
}
|
|
}
|
|
|
|
private static readonly System.Collections.Concurrent.ConcurrentDictionary<Type, System.Reflection.MethodInfo> _streamMethods = new();
|
|
|
|
private IAsyncEnumerable<byte[]> GetTypedStream(Type elementType, object responseData, int messageTag, CancellationToken ct)
|
|
{
|
|
var methodInfo = _streamMethods.GetOrAdd(elementType, type =>
|
|
typeof(AcWebSignalRHubBase<TSignalRTags, TLogger>)
|
|
.GetMethod(nameof(EnumerateGenericAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
|
|
.MakeGenericMethod(type));
|
|
|
|
return (IAsyncEnumerable<byte[]>)methodInfo.Invoke(this, [responseData, messageTag, ct])!;
|
|
}
|
|
|
|
private async IAsyncEnumerable<byte[]> EnumerateGenericAsync<T>(object result, int messageTag, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
var enumerable = (IAsyncEnumerable<T>)result;
|
|
await foreach (var item in enumerable.WithCancellation(cancellationToken))
|
|
{
|
|
if (item is byte[] bytes)
|
|
{
|
|
yield return bytes;
|
|
}
|
|
else if (item is ISignalRMessage sigMsg)
|
|
{
|
|
yield return SignalRSerializationHelper.SerializeToBinary(sigMsg);
|
|
}
|
|
else
|
|
{
|
|
var msg = CreateResponseMessage(messageTag, SignalResponseStatus.Success, item);
|
|
yield return SignalRSerializationHelper.SerializeToBinary(msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
private static Type? GetAsyncEnumerableElementType(Type type)
|
|
{
|
|
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
|
|
return type.GetGenericArguments()[0];
|
|
|
|
foreach (var intf in type.GetInterfaces())
|
|
{
|
|
if (intf.IsGenericType && intf.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
|
|
return intf.GetGenericArguments()[0];
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
protected virtual async Task ProcessOnReceiveMessage(int messageTag, byte[]? message, int? requestId, Func<string, Task>? notFoundCallback)
|
|
{
|
|
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
|
|
|
|
if (message is { Length: 0 })
|
|
{
|
|
Logger.Warning($"message.Length == 0! Server OnReceiveMessage; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
}
|
|
else
|
|
{
|
|
Logger.Debug($"[{message?.Length:N0}b] Server OnReceiveMessage; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
}
|
|
|
|
try
|
|
{
|
|
if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId();
|
|
|
|
if (TryFindAndInvokeMethod(messageTag, message, tagName, out var responseData))
|
|
{
|
|
var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData);
|
|
|
|
if (Logger.LogLevel <= LogLevel.Debug)
|
|
{
|
|
var responseSize = GetResponseSize(responseMessage);
|
|
Logger.Debug($"[{responseSize / 1024}kb] responseData serialized ({SerializerOptions.SerializerType})");
|
|
}
|
|
|
|
// Log binary diagnostics if enabled
|
|
if (EnableBinaryDiagnostics && responseMessage is SignalResponseDataMessage dataMsg && dataMsg.ResponseData != null)
|
|
{
|
|
LogResponseDataDiagnostics(messageTag, tagName, requestId, dataMsg.ResponseData);
|
|
}
|
|
|
|
await ResponseToCaller(messageTag, responseMessage, requestId);
|
|
return;
|
|
}
|
|
|
|
Logger.Warning($"Not found dynamic method for the tag! {tagName}");
|
|
notFoundCallback?.Invoke(tagName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Error($"Server OnReceiveMessage; {ex.Message}; {tagName}", ex);
|
|
}
|
|
|
|
await ResponseToCaller(messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Error, null), requestId);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads a VarUInt from byte array at given position.
|
|
/// </summary>
|
|
private static (uint value, int bytesRead) ReadVarUINTFromBytes(byte[] data, int startPos)
|
|
{
|
|
uint value = 0;
|
|
var shift = 0;
|
|
var bytesRead = 0;
|
|
|
|
while (startPos + bytesRead < data.Length)
|
|
{
|
|
var b = data[startPos + bytesRead];
|
|
bytesRead++;
|
|
value |= (uint)(b & 0x7F) << shift;
|
|
if ((b & 0x80) == 0)
|
|
break;
|
|
shift += 7;
|
|
if (shift > 35)
|
|
break;
|
|
}
|
|
|
|
return (value, bytesRead);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Logs type information about the response data before serialization.
|
|
/// </summary>
|
|
private void LogResponseDataTypeInfo(object responseData)
|
|
{
|
|
try
|
|
{
|
|
var type = responseData.GetType();
|
|
Logger.Info($"=== SERVER RESPONSE TYPE INFO (BEFORE SERIALIZE) ===");
|
|
Logger.Info($"Runtime Type: {type.Name}");
|
|
Logger.Info($"FullName: {type.FullName}");
|
|
Logger.Info($"Namespace: {type.Namespace}");
|
|
Logger.Info($"Assembly: {type.Assembly.GetName().Name} v{type.Assembly.GetName().Version}");
|
|
Logger.Info($"AssemblyQualifiedName: {type.AssemblyQualifiedName}");
|
|
Logger.Info($"Assembly Location: {type.Assembly.Location}");
|
|
|
|
// For collections, log element type info
|
|
if (type.IsGenericType)
|
|
{
|
|
var genericArgs = type.GetGenericArguments();
|
|
Logger.Info($"Generic Arguments: [{string.Join(", ", genericArgs.Select(t => t.FullName))}]");
|
|
|
|
if (genericArgs.Length == 1)
|
|
{
|
|
var elementType = genericArgs[0];
|
|
Logger.Info($"--- ELEMENT TYPE INFO ---");
|
|
Logger.Info($"Element Type: {elementType.Name}");
|
|
Logger.Info($"Element FullName: {elementType.FullName}");
|
|
Logger.Info($"Element Namespace: {elementType.Namespace}");
|
|
Logger.Info($"Element Assembly: {elementType.Assembly.GetName().Name} v{elementType.Assembly.GetName().Version}");
|
|
Logger.Info($"Element AssemblyQualifiedName: {elementType.AssemblyQualifiedName}");
|
|
Logger.Info($"Element Assembly Location: {elementType.Assembly.Location}");
|
|
Logger.Info($"Element BaseType: {elementType.BaseType?.FullName ?? "null"}");
|
|
|
|
// Log inheritance chain
|
|
var baseType = elementType.BaseType;
|
|
var inheritanceChain = new List<string>();
|
|
while (baseType != null && baseType != typeof(object))
|
|
{
|
|
inheritanceChain.Add($"{baseType.Name} ({baseType.Assembly.GetName().Name})");
|
|
baseType = baseType.BaseType;
|
|
}
|
|
if (inheritanceChain.Count > 0)
|
|
{
|
|
Logger.Info($"Element Inheritance: {string.Join(" -> ", inheritanceChain)}");
|
|
}
|
|
|
|
LogTypePropertiesServer(elementType, "Element");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Logger.Info($"BaseType: {type.BaseType?.FullName ?? "null"}");
|
|
LogTypePropertiesServer(type, "Response");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Warning($"Failed to log response type info: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Logs all properties of a type with their declaring types.
|
|
/// </summary>
|
|
private void LogTypePropertiesServer(Type type, string prefix)
|
|
{
|
|
var props = type.GetProperties(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance)
|
|
.Where(p => p.CanRead && p.GetIndexParameters().Length == 0)
|
|
.ToArray();
|
|
|
|
// Log in declaration order (not alphabetically)
|
|
Logger.Info($"{prefix} Property Count: {props.Length}");
|
|
for (var i = 0; i < props.Length; i++)
|
|
{
|
|
var p = props[i];
|
|
var declaringType = p.DeclaringType?.Name ?? "?";
|
|
var declaringAssembly = p.DeclaringType?.Assembly.GetName().Name ?? "?";
|
|
Logger.Info($" {prefix}[{i}]: {p.Name} : {p.PropertyType.Name} (declared in {declaringType} @ {declaringAssembly})");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Logs diagnostic information about the ResponseData binary for debugging serialization issues.
|
|
/// </summary>
|
|
private void LogResponseDataDiagnostics(int messageTag, string tagName, int? requestId, byte[] responseData)
|
|
{
|
|
try
|
|
{
|
|
Logger.Info($"=== SERVER RESPONSE DATA DIAGNOSTICS (AFTER SERIALIZE) ===");
|
|
Logger.Info($"Tag: {messageTag} ({tagName}); RequestId: {requestId}; ResponseData.Length: {responseData.Length}");
|
|
Logger.Info($"HEX (first 500 bytes): {Convert.ToHexString(responseData.AsSpan(0, Math.Min(500, responseData.Length)))}");
|
|
|
|
if (responseData.Length >= 3)
|
|
{
|
|
var version = responseData[0];
|
|
var marker = responseData[1];
|
|
Logger.Info($"Version: {version}; Marker: 0x{marker:X2}");
|
|
|
|
if ((marker & 0x10) != 0)
|
|
{
|
|
// Read property count as VarUInt
|
|
var pos = 2;
|
|
var (propCount, bytesRead) = ReadVarUINTFromBytes(responseData, pos);
|
|
pos += bytesRead;
|
|
|
|
Logger.Info($"Header property count: {propCount}");
|
|
|
|
for (var i = 0; i < (int)propCount && pos < responseData.Length; i++)
|
|
{
|
|
// Read string length as VarUInt
|
|
var (strLen, strLenBytes) = ReadVarUINTFromBytes(responseData, pos);
|
|
pos += strLenBytes;
|
|
|
|
if (pos + (int)strLen <= responseData.Length)
|
|
{
|
|
var propName = System.Text.Encoding.UTF8.GetString(responseData, pos, (int)strLen);
|
|
pos += (int)strLen;
|
|
Logger.Info($" Header[{i}]: '{propName}'");
|
|
}
|
|
else
|
|
{
|
|
Logger.Info($" Header[{i}]: <truncated at pos {pos}>");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Warning($"Failed to log response data diagnostics: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a response message using the configured serializer.
|
|
/// </summary>
|
|
protected virtual ISignalRMessage CreateResponseMessage(int messageTag, SignalResponseStatus status, object? responseData)
|
|
{
|
|
return new SignalResponseDataMessage(messageTag, status, responseData, SerializerOptions);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the size of the response data for logging purposes.
|
|
/// </summary>
|
|
private static int GetResponseSize(ISignalRMessage responseMessage)
|
|
{
|
|
return responseMessage is SignalResponseDataMessage dataMsg ? dataMsg.ResponseData?.Length ?? 0 : 0;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Finds and invokes the method registered for the given message tag.
|
|
/// </summary>
|
|
private bool TryFindAndInvokeMethod(int messageTag, byte[]? message, string tagName, out object? responseData)
|
|
{
|
|
responseData = null;
|
|
|
|
var result = DynamicMethodRegistry.GetMethodByMessageTag(messageTag);
|
|
if (!result.HasValue)
|
|
return false;
|
|
|
|
var (instance, methodInfoModel) = result.Value;
|
|
var methodName = $"{instance.GetType().Name}.{methodInfoModel.MethodInfo.Name}";
|
|
var paramValues = DeserializeParameters(message, methodInfoModel, tagName, methodName);
|
|
|
|
Logger.Debug(paramValues == null
|
|
? $"Found dynamic method for the tag! method: {methodName}(); {tagName}"
|
|
: $"Found dynamic method for the tag! method: {methodName}({string.Join(", ", methodInfoModel.ParamInfos.Select(x => x.Name))}); {tagName}");
|
|
|
|
responseData = methodInfoModel.MethodInfo.InvokeMethod(instance, paramValues);
|
|
|
|
if (EnableBinaryDiagnostics && responseData != null)
|
|
{
|
|
LogResponseDataTypeInfo(responseData);
|
|
}
|
|
|
|
if (methodInfoModel.Attribute.SendToOtherClientType != SendToClientType.None)
|
|
SendMessageToOthers(methodInfoModel.Attribute.SendToOtherClientTag, responseData).Forget();
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deserializes parameters from the binary message based on method signature.
|
|
/// The message is a binary-serialized object[] from the client.
|
|
/// Supports optional parameters with default values.
|
|
/// </summary>
|
|
private static object[]? DeserializeParameters(byte[]? message, AcMethodInfoModel<SignalRAttribute> methodInfoModel, string tagName, string methodName)
|
|
{
|
|
var paramInfos = methodInfoModel.ParamInfos;
|
|
if (paramInfos is not { Length: > 0 })
|
|
return null;
|
|
|
|
if (message is null or { Length: 0 })
|
|
{
|
|
if (paramInfos.All(p => p.HasDefaultValue))
|
|
return paramInfos.Select(p => p.DefaultValue).ToArray();
|
|
|
|
throw new ArgumentException($"Message is null or empty but method '{methodName}' requires parameters; {tagName}");
|
|
}
|
|
|
|
return SignalRSerializationHelper.DeserializeParametersFromBinary(message, paramInfos);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Response Methods
|
|
|
|
protected virtual Task ResponseToCallerWithContent(int messageTag, object? content)
|
|
=> ResponseToCaller(messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null);
|
|
|
|
protected virtual Task ResponseToCaller(int messageTag, ISignalRMessage message, int? requestId)
|
|
=> SendMessageToClient(Clients.Caller, messageTag, message, requestId);
|
|
|
|
protected virtual Task SendMessageToUserIdWithContent(string userId, int messageTag, object? content)
|
|
=> SendMessageToUserIdInternal(userId, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null);
|
|
|
|
protected virtual Task SendMessageToUserIdInternal(string userId, int messageTag, ISignalRMessage message, int? requestId)
|
|
=> SendMessageToClient(Clients.User(userId), messageTag, message, requestId);
|
|
|
|
protected virtual Task SendMessageToConnectionIdWithContent(string connectionId, int messageTag, object? content)
|
|
=> SendMessageToConnectionIdInternal(connectionId, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null);
|
|
|
|
protected virtual Task SendMessageToConnectionIdInternal(string connectionId, int messageTag, ISignalRMessage message, int? requestId)
|
|
=> SendMessageToClient(Clients.Client(connectionId), messageTag, message, requestId);
|
|
|
|
protected virtual Task SendMessageToOthers(int messageTag, object? content)
|
|
=> SendMessageToClient(Clients.Others, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null);
|
|
|
|
protected virtual Task SendMessageToAll(int messageTag, object? content)
|
|
=> SendMessageToClient(Clients.All, messageTag, CreateResponseMessage(messageTag, SignalResponseStatus.Success, content), null);
|
|
|
|
/// <summary>
|
|
/// Sends message to client using Binary serialization.
|
|
/// </summary>
|
|
protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, ISignalRMessage message, int? requestId = null)
|
|
{
|
|
var responseMessage = (SignalResponseDataMessage)message;
|
|
var receiveParams = new SignalReceiveParams
|
|
{
|
|
Status = responseMessage.Status,
|
|
DataSerializerType = responseMessage.DataSerializerType
|
|
};
|
|
var responseData = responseMessage.ResponseData ?? [];
|
|
|
|
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
|
|
|
|
Logger.Debug($"[{responseData.Length / 1024}kb] Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
|
|
await sendTo.OnReceiveMessage(messageTag, requestId, receiveParams, responseData);
|
|
|
|
Logger.Debug($"Server sent message to client; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}");
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Context Accessor Methods
|
|
|
|
protected virtual string GetConnectionId() => Context.ConnectionId;
|
|
protected virtual bool IsConnectionAborted() => Context.ConnectionAborted.IsCancellationRequested;
|
|
protected virtual string? GetUserIdentifier() => Context.UserIdentifier;
|
|
protected virtual ClaimsPrincipal? GetUser() => Context.User;
|
|
|
|
#endregion
|
|
|
|
#region Logging
|
|
|
|
protected virtual void LogContextUserNameAndId()
|
|
{
|
|
var user = GetUser();
|
|
if (user == null) return;
|
|
|
|
var userName = user.Identity?.Name;
|
|
Guid.TryParse(user.FindFirstValue(ClaimTypes.NameIdentifier), out var userId);
|
|
|
|
if (AcDomain.IsDeveloperVersion)
|
|
Logger.WarningConditional($"SignalR.Context; userName: {userName}; userId: {userId}");
|
|
else
|
|
Logger.Debug($"SignalR.Context; userName: {userName}; userId: {userId}");
|
|
}
|
|
|
|
#endregion
|
|
} |