using System.Security.Claims; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Core.Serializers; using AyCode.Core.Serializers.Binaries; using AyCode.Models.Server.DynamicMethods; using AyCode.Services.SignalRs; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; namespace AyCode.Services.Server.SignalRs; public abstract class AcWebSignalRHubBase(IConfiguration configuration, TLogger logger) : Hub, IAcSignalRHubServer where TSignalRTags : AcSignalRTags where TLogger : AcLoggerBase { /// /// Registry for dynamic method lookups with lazy initialization. /// protected readonly AcDynamicMethodRegistry DynamicMethodRegistry = new(); protected TLogger Logger = logger; protected IConfiguration Configuration = configuration; protected AcSerializerOptions SerializerOptions = new AcBinarySerializerOptions(); #region Connection Lifecycle public override async Task OnConnectedAsync() { Logger.Debug($"Server OnConnectedAsync; ConnectionId: {GetConnectionId()}; UserIdentifier: {GetUserIdentifier()}"); LogContextUserNameAndId(); await base.OnConnectedAsync(); } public override async Task OnDisconnectedAsync(Exception? exception) { var connectionId = GetConnectionId(); var userIdentifier = GetUserIdentifier(); if (exception == null) Logger.Debug($"Server OnDisconnectedAsync; ConnectionId: {connectionId}; UserIdentifier: {userIdentifier}"); else Logger.Error($"Server OnDisconnectedAsync; ConnectionId: {connectionId}; UserIdentifier: {userIdentifier}", exception); LogContextUserNameAndId(); await base.OnDisconnectedAsync(exception); } #endregion #region Message Processing public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data) { return ProcessOnReceiveMessage(messageTag, signalParams, requestId, null); } public virtual IAsyncEnumerable OnReceiveStreamMessage(int messageTag, byte[]? messageBytes) { var parameterBytes = messageBytes is { Length: > 0 } ? SignalRSerializationHelper.DeserializeFromBinary(messageBytes) : null; return ProcessOnStreamMessage(messageTag, parameterBytes, Context.ConnectionAborted); } protected virtual async IAsyncEnumerable ProcessOnStreamMessage(int messageTag, byte[][]? parameterBytes, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { var tagName = ConstHelper.NameByValue(messageTag); Logger.Debug($"Server OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}"); try { if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId(); // Build SignalParams from raw byte[][] for stream path var signalParams = new SignalParams { Status = SignalResponseStatus.Success }; if (parameterBytes is { Length: > 0 }) signalParams.Parameters = parameterBytes.ToBinary(); if (TryFindAndInvokeMethod(messageTag, signalParams, tagName, out var responseData)) { if (responseData == null) yield break; var resultType = responseData.GetType(); var elementType = GetAsyncEnumerableElementType(resultType); if (elementType != null) { var typedEnumerable = GetTypedStream(elementType, responseData, messageTag, cancellationToken); await foreach (var chunk in typedEnumerable.WithCancellation(cancellationToken)) { yield return chunk; } } else { Logger.Warning($"Method '{tagName}' does not return IAsyncEnumerable. Returning normal message as single chunk."); var responseMessage = CreateStreamResponseMessage(messageTag, SignalResponseStatus.Success, responseData); yield return SignalRSerializationHelper.SerializeToBinary(responseMessage); } } else { Logger.Warning($"Not found dynamic method for the tag! {tagName}"); } } finally { Logger.Debug($"Server closed OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}"); } } private static readonly System.Collections.Concurrent.ConcurrentDictionary _streamMethods = new(); private IAsyncEnumerable GetTypedStream(Type elementType, object responseData, int messageTag, CancellationToken ct) { var methodInfo = _streamMethods.GetOrAdd(elementType, type => typeof(AcWebSignalRHubBase) .GetMethod(nameof(EnumerateGenericAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! .MakeGenericMethod(type)); return (IAsyncEnumerable)methodInfo.Invoke(this, [responseData, messageTag, ct])!; } private async IAsyncEnumerable EnumerateGenericAsync(object result, int messageTag, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { var enumerable = (IAsyncEnumerable)result; await foreach (var item in enumerable.WithCancellation(cancellationToken)) { if (item is byte[] bytes) { yield return bytes; } else if (item is ISignalRMessage sigMsg) { yield return SignalRSerializationHelper.SerializeToBinary(sigMsg); } else { var msg = CreateStreamResponseMessage(messageTag, SignalResponseStatus.Success, item); yield return SignalRSerializationHelper.SerializeToBinary(msg); } } } private static Type? GetAsyncEnumerableElementType(Type type) { if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)) return type.GetGenericArguments()[0]; foreach (var intf in type.GetInterfaces()) { if (intf.IsGenericType && intf.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)) return intf.GetGenericArguments()[0]; } return null; } protected virtual async Task ProcessOnReceiveMessage(int messageTag, SignalParams signalParams, int? requestId, Func? notFoundCallback) { var tagName = ConstHelper.NameByValue(messageTag); Logger.Debug($"Server OnReceiveMessage; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}"); try { if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId(); if (TryFindAndInvokeMethod(messageTag, signalParams, tagName, out var responseData)) { if (Logger.LogLevel <= LogLevel.Debug) Logger.Debug($"responseData ready ({SerializerOptions.SerializerType})"); await ResponseToCaller(messageTag, SignalResponseStatus.Success, responseData, requestId); return; } Logger.Warning($"Not found dynamic method for the tag! {tagName}"); notFoundCallback?.Invoke(tagName); } catch (Exception ex) { Logger.Error($"Server OnReceiveMessage; {ex.Message}; {tagName}", ex); } await ResponseToCaller(messageTag, SignalResponseStatus.Error, null, requestId); } /// /// Creates a SignalResponseDataMessage for stream path (serialized as wire format blob). /// Main send path uses SendMessageToClient directly — no wrapper needed. /// protected SignalResponseDataMessage CreateStreamResponseMessage(int messageTag, SignalResponseStatus status, object? responseData) => new(messageTag, status, responseData, SerializerOptions); /// /// Finds and invokes the method registered for the given message tag. /// private bool TryFindAndInvokeMethod(int messageTag, SignalParams signalParams, string tagName, out object? responseData) { responseData = null; var result = DynamicMethodRegistry.GetMethodByMessageTag(messageTag); if (!result.HasValue) return false; var (instance, methodInfoModel) = result.Value; var methodName = $"{instance.GetType().Name}.{methodInfoModel.MethodInfo.Name}"; var paramValues = DeserializeParameters(signalParams, methodInfoModel, tagName, methodName); Logger.Debug(paramValues == null ? $"Found dynamic method for the tag! method: {methodName}(); {tagName}" : $"Found dynamic method for the tag! method: {methodName}({string.Join(", ", methodInfoModel.ParamInfos.Select(x => x.Name))}); {tagName}"); responseData = methodInfoModel.MethodInfo.InvokeMethod(instance, paramValues); if (methodInfoModel.Attribute.SendToOtherClientType != SendToClientType.None) SendMessageToOthers(methodInfoModel.Attribute.SendToOtherClientTag, responseData).Forget(); return true; } /// /// Deserializes parameters from SignalParams using GetParameterValues. /// Validates that required parameters are present. /// private static object[]? DeserializeParameters(SignalParams signalParams, AcMethodInfoModel methodInfoModel, string tagName, string methodName) { var paramInfos = methodInfoModel.ParamInfos; if (paramInfos is not { Length: > 0 }) return null; var paramValues = signalParams.GetParameterValues(paramInfos); if (paramValues is null) { if (paramInfos.All(p => p.HasDefaultValue)) return paramInfos.Select(p => p.DefaultValue).ToArray(); throw new ArgumentException($"Message has no parameters but method '{methodName}' requires parameters; {tagName}"); } // Validate: null in a non-optional parameter slot means it was missing for (var i = 0; i < paramInfos.Length; i++) { if (paramValues[i] is null && !paramInfos[i].HasDefaultValue && paramInfos[i].ParameterType.IsValueType) throw new ArgumentException($"Method '{methodName}' requires parameter '{paramInfos[i].Name}' (index {i}) but it was not sent; {tagName}"); } return paramValues; } #endregion #region Response Methods protected virtual Task ResponseToCallerWithContent(int messageTag, object? content) => SendMessageToClient(Clients.Caller, messageTag, SignalResponseStatus.Success, content); protected virtual Task ResponseToCaller(int messageTag, SignalResponseStatus status, object? responseData, int? requestId) => SendMessageToClient(Clients.Caller, messageTag, status, responseData, requestId); protected virtual Task SendMessageToUserIdWithContent(string userId, int messageTag, object? content) => SendMessageToClient(Clients.User(userId), messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToConnectionIdWithContent(string connectionId, int messageTag, object? content) => SendMessageToClient(Clients.Client(connectionId), messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToOthers(int messageTag, object? content) => SendMessageToClient(Clients.Others, messageTag, SignalResponseStatus.Success, content); protected virtual Task SendMessageToAll(int messageTag, object? content) => SendMessageToClient(Clients.All, messageTag, SignalResponseStatus.Success, content); /// /// Sends message to client. Protocol serializes responseData directly to pipe (zero-copy write). /// protected virtual async Task SendMessageToClient(IAcSignalRHubItemServer sendTo, int messageTag, SignalResponseStatus status, object? responseData, int? requestId = null) { var signalParams = new SignalParams { Status = status, DataSerializerType = SerializerOptions.SerializerType, SignalDataType = responseData?.GetType().AssemblyQualifiedName }; var tagName = ConstHelper.NameByValue(messageTag); Logger.Debug($"Server sending message to client; requestId: {requestId}; Aborted: {IsConnectionAborted()}; ConnectionId: {GetConnectionId()}; {tagName}"); await sendTo.OnReceiveMessage(messageTag, requestId, signalParams, responseData!); Logger.Debug($"Server sent message to client; requestId: {requestId}; ConnectionId: {GetConnectionId()}; {tagName}"); } #endregion #region Context Accessor Methods protected virtual string GetConnectionId() => Context.ConnectionId; protected virtual bool IsConnectionAborted() => Context.ConnectionAborted.IsCancellationRequested; protected virtual string? GetUserIdentifier() => Context.UserIdentifier; protected virtual ClaimsPrincipal? GetUser() => Context.User; #endregion #region Logging protected virtual void LogContextUserNameAndId() { var user = GetUser(); if (user == null) return; var userName = user.Identity?.Name; Guid.TryParse(user.FindFirstValue(ClaimTypes.NameIdentifier), out var userId); if (AcDomain.IsDeveloperVersion) Logger.WarningConditional($"SignalR.Context; userName: {userName}; userId: {userId}"); else Logger.Debug($"SignalR.Context; userName: {userName}; userId: {userId}"); } #endregion }