using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Interfaces.Entities; using MessagePack.Resolvers; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; using static AyCode.Core.Extensions.JsonUtilities; namespace AyCode.Services.SignalRs { public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; //protected event Action OnMessageReceived = null!; protected abstract Task MessageReceived(int messageTag, byte[] messageBytes); public int MsDelay = 25; public int MsFirstDelay = 50; public int ConnectionTimeout = 10000; public int TransportSendTimeout = 60000; private const string TagsName = "SignalRTags"; /// /// Production constructor - creates and starts HubConnection. /// protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger) { Logger = logger; Logger.Detail(fullHubName); //TODO: HubConnectionBuilder constructor!!! - J. HubConnection = new HubConnectionBuilder() //.WithUrl(fullHubName) .WithUrl(fullHubName, HttpTransportType.WebSockets, options => { options.TransportMaxBufferSize = 30_000_000; //Increasing this value allows the client to receive larger messages. default: 65KB; unlimited: 0;; options.ApplicationMaxBufferSize = 30_000_000; //Increasing this value allows the client to send larger messages. default: 65KB; unlimited: 0; options.CloseTimeout = TimeSpan.FromSeconds(10); //default: 5 sec. options.SkipNegotiation = true; // Skip HTTP negotiation when using WebSockets only //options.AccessTokenProvider = null; //options.HttpMessageHandlerFactory = null; //options.Headers["CustomData"] = "value"; //options.SkipNegotiation = true; //options.ClientCertificates = new System.Security.Cryptography.X509Certificates.X509CertificateCollection(); //options.Cookies = new System.Net.CookieContainer(); //options.DefaultTransferFormat = TransferFormat.Text; //options.Credentials = null; //options.Proxy = null; //options.UseDefaultCredentials = true; //options.WebSocketConfiguration = null; //options.WebSocketFactory = null; }) //.ConfigureLogging(logging => //{ // logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Trace); // logging.AddConsole(); //}) .WithAutomaticReconnect() .WithStatefulReconnect() .WithKeepAliveInterval(TimeSpan.FromSeconds(60)) .WithServerTimeout(TimeSpan.FromSeconds(180)) //.AddMessagePackProtocol(options => { // options.SerializerOptions = MessagePackSerializerOptions.Standard // .WithResolver(MessagePack.Resolvers.StandardResolver.Instance) // .WithSecurity(MessagePackSecurity.UntrustedData) // .WithCompression(MessagePackCompression.Lz4Block) // .WithCompressionMinLength(256);}) .Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); //_ = HubConnection.On(nameof(IAcSignalRHubClient.OnRequestMessage), OnRequestMessage); //HubConnection.StartAsync().Forget(); } /// /// Test constructor - allows testing without real HubConnection. /// Override virtual methods to control behavior in tests. /// protected AcSignalRClientBase(AcLoggerBase logger) { Logger = logger; HubConnection = null; } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional($"Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); ClearPendingRequests(); return Task.CompletedTask; } #region Connection State Methods (virtual for testing) /// /// Gets the current connection state. Override in tests. /// protected virtual HubConnectionState GetConnectionState() => HubConnection?.State ?? HubConnectionState.Disconnected; /// /// Checks if the connection is connected. Override in tests. /// protected virtual bool IsConnected() => GetConnectionState() == HubConnectionState.Connected; /// /// Starts the connection. Override in tests to avoid real connection. /// protected virtual Task StartConnectionInternal() { if (HubConnection == null) return Task.CompletedTask; return HubConnection.StartAsync(); } /// /// Stops the connection. Override in tests. /// protected virtual Task StopConnectionInternal() { if (HubConnection == null) return Task.CompletedTask; return HubConnection.StopAsync(); } /// /// Disposes the connection. Override in tests. /// protected virtual ValueTask DisposeConnectionInternal() { if (HubConnection == null) return ValueTask.CompletedTask; return HubConnection.DisposeAsync(); } /// /// Sends a message to the server via HubConnection. Override in tests. /// protected virtual Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId) { if (HubConnection == null) return Task.CompletedTask; return HubConnection.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, messageBytes, requestId); } #endregion #region Protected Test Helpers /// /// Gets the pending requests dictionary for testing. /// protected ConcurrentDictionary GetPendingRequests() => _responseByRequestId; /// /// Clears all pending requests. /// protected void ClearPendingRequests() => _responseByRequestId.Clear(); /// /// Registers a pending request for testing. /// protected void RegisterPendingRequest(int requestId, SignalRRequestModel model) => _responseByRequestId[requestId] = model; /// /// Simulates receiving a response for testing. /// protected void SimulateResponse(int requestId, ISignalResponseMessage response) { if (_responseByRequestId.TryGetValue(requestId, out var model)) { model.ResponseByRequestId = response; model.ResponseDateTime = DateTime.UtcNow; } } #endregion public async Task StartConnection() { if (GetConnectionState() == HubConnectionState.Disconnected) await StartConnectionInternal(); if (!IsConnected()) await TaskHelper.WaitToAsync(IsConnected, ConnectionTimeout, 10, 25); } public async Task StopConnection() { await StopConnectionInternal(); await DisposeConnectionInternal(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, null, GetNextRequestId()); public virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int? requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}"); await StartConnection(); var msgp = message?.ToMessagePack(ContractlessStandardResolver.Options); if (!IsConnected()) { Logger.Error($"Client SendMessageToServerAsync error! ConnectionState: {GetConnectionState()};"); return; } await SendToHubAsync(messageTag, msgp, requestId); } #region CRUD public virtual Task PostAsync(int messageTag, object parameter) //where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(parameter)), GetNextRequestId()); public virtual Task PostAsync(int messageTag, object[] parameters) //where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(parameters)), GetNextRequestId()); public virtual Task GetByIdAsync(int messageTag, object id) //where TResponseData : class => PostAsync(messageTag, id); public virtual Task GetByIdAsync(int messageTag, Func, Task> responseCallback, object id) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(id)), responseCallback); public virtual Task GetByIdAsync(int messageTag, object[] ids) //where TResponseData : class => PostAsync(messageTag, ids); public virtual Task GetByIdAsync(int messageTag, Func, Task> responseCallback, object[] ids) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), responseCallback); public virtual Task GetAllAsync(int messageTag) //where TResponseData : class => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, (contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams))), responseCallback); public virtual Task GetAllAsync(int messageTag, object[]? contextParams) //where TResponseData : class => SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)), GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData) //where TPostData : class where TResponseData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData, Func, Task> responseCallback) //where TPostData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback); public virtual Task PostDataAsync(int messageTag, TPostData postData, Func, Task> responseCallback) //where TPostData : class where TResponseData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback); /// /// Creates the appropriate message wrapper for the post data. /// Primitives, strings, enums, and value types are wrapped in IdMessage. /// Complex objects are sent directly in SignalPostJsonDataMessage. /// private static ISignalRMessage CreatePostMessage(TPostData postData) { var type = typeof(TPostData); // Primitives, strings, enums, and value types should use IdMessage format if (IsPrimitiveOrStringOrEnum(type)) { return new SignalPostJsonDataMessage(new IdMessage(postData!)); } // Complex objects use direct serialization return new SignalPostJsonDataMessage(postData); } /// /// Determines if a type should use IdMessage format (primitives, strings, enums, value types). /// Must match the logic in AcWebSignalRHubBase.IsPrimitiveOrStringOrEnum. /// NOTE: Arrays and collections are NOT included here - they are complex objects for PostDataAsync. /// private static bool IsPrimitiveOrStringOrEnum(Type type) { return type == typeof(string) || type.IsEnum || type.IsValueType || type == typeof(DateTime); } public Task GetAllIntoAsync(List intoList, int messageTag, object[]? contextParams = null, Action? callback = null) where TResponseItem : IEntityGuid { return GetAllAsync>(messageTag, response => { var logText = $"GetAllIntoAsync<{typeof(TResponseItem).Name}>(); status: {response.Status}; dataCount: {response.ResponseData?.Count}; {ConstHelper.NameByValue(TagsName, messageTag)};"; intoList.Clear(); if (response.Status == SignalResponseStatus.Success && response.ResponseData != null) { Logger.Debug(logText); intoList.AddRange(response.ResponseData); } else Logger.Error(logText); callback?.Invoke(); return Task.CompletedTask; }, contextParams); } #endregion CRUD public virtual Task SendMessageToServerAsync(int messageTag) //where TResponse : class => SendMessageToServerAsync(messageTag, null, GetNextRequestId()); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message) //where TResponse : class => SendMessageToServerAsync(messageTag, message, GetNextRequestId()); protected virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int requestId) //where TResponse : class { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"); var startTime = DateTime.Now; var requestModel = SignalRRequestModelPool.Get(); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, message, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, TransportSendTimeout, MsDelay, MsFirstDelay) && _responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is ISignalResponseMessage responseMessage) { startTime = obj.RequestDateTime; SignalRRequestModelPool.Return(obj); if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}"; Logger.Error(errorText); return await Task.FromException(new Exception(errorText)); } var responseData = DeserializeResponseData(responseMessage); if (responseData == null && responseMessage.Status == SignalResponseStatus.Success) { // Null response is valid for Success status Logger.Info($"Client received null response. Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return default; } var serializerType = responseMessage switch { SignalResponseBinaryMessage => "Binary", _ => "JSON" }; Logger.Info($"Client deserialized response ({serializerType}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return responseData; } Logger.Error($"Client timeout after: {(DateTime.Now - startTime).TotalSeconds} sec! ConnectionState: {GetConnectionState()}; requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); } catch (Exception ex) { Logger.Error($"Client SendMessageToServerAsync; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); } if (_responseByRequestId.TryRemove(requestId, out var removedModel)) { SignalRRequestModelPool.Return(removedModel); } return default; } /// /// Deserializes response data from either JSON or Binary format. /// Automatically detects the format based on the response message type. /// private static TResponse? DeserializeResponseData(ISignalResponseMessage responseMessage) { return responseMessage switch { SignalResponseBinaryMessage binaryMsg when binaryMsg.ResponseData != null => binaryMsg.ResponseData.BinaryTo(), SignalResponseJsonMessage jsonMsg when !string.IsNullOrEmpty(jsonMsg.ResponseData) => jsonMsg.ResponseData.JsonTo(), ISignalResponseMessage stringMsg when !string.IsNullOrEmpty(stringMsg.ResponseData) => stringMsg.ResponseData.JsonTo(), _ => default }; } public virtual Task SendMessageToServerAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, Func, Task> responseCallback) { if (messageTag == 0) Logger.Error($"SendMessageToServerAsync; messageTag == 0"); var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(new Action(responseMessage => { TResponseData? responseData = default; if (responseMessage.Status == SignalResponseStatus.Success) { responseData = DeserializeResponseData(responseMessage); } else Logger.Error($"Client SendMessageToServerAsync response error; callback; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"); responseCallback(new SignalResponseMessage(messageTag, responseMessage.Status, responseData)); })); _responseByRequestId[requestId] = requestModel; return SendMessageToServerAsync(messageTag, message, requestId); } /// /// Gets the next unique request ID. /// protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32; public virtual Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"; if (messageBytes.Length == 0) Logger.Warning($"message.Length == 0! {logText}"); try { if (requestId.HasValue && _responseByRequestId.ContainsKey(requestId.Value)) { var reqId = requestId.Value; _responseByRequestId[reqId].ResponseDateTime = DateTime.UtcNow; Logger.Debug($"[{_responseByRequestId[reqId].ResponseDateTime.Subtract(_responseByRequestId[reqId].RequestDateTime).TotalMilliseconds:N0}ms][{(messageBytes.Length / 1024)}kb]{logText}"); var responseMessage = DeserializeResponseMessage(messageBytes); switch (_responseByRequestId[reqId].ResponseByRequestId) { case null: _responseByRequestId[reqId].ResponseByRequestId = responseMessage; return Task.CompletedTask; case Action messageCallback: if (_responseByRequestId.TryRemove(reqId, out var callbackModel)) { SignalRRequestModelPool.Return(callbackModel); } messageCallback.Invoke(responseMessage); return Task.CompletedTask; // Legacy support for string-based callbacks case Action> stringCallback when responseMessage is SignalResponseJsonMessage jsonMsg: if (_responseByRequestId.TryRemove(reqId, out var legacyModel)) { SignalRRequestModelPool.Return(legacyModel); } stringCallback.Invoke(jsonMsg); return Task.CompletedTask; default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {_responseByRequestId[reqId].ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(TagsName, messageTag)}"); break; } if (_responseByRequestId.TryRemove(reqId, out var removedModel)) { SignalRRequestModelPool.Return(removedModel); } // Request-response hibás eset - ne hívjuk meg a MessageReceived-et return Task.CompletedTask; } // Csak broadcast/notification üzeneteknél hívjuk meg a MessageReceived-et Logger.Info(logText); MessageReceived(messageTag, messageBytes).Forget(); } catch (Exception ex) { if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel)) { SignalRRequestModelPool.Return(exModel); } Logger.Error($"Client OnReceiveMessage; ConnectionState: {GetConnectionState()}; requestId: {requestId}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); throw; } return Task.CompletedTask; } /// /// Deserializes a MessagePack response to the appropriate message type (JSON or Binary). /// Uses DetectSerializerTypeFromBytes to determine the format of the ResponseData. /// protected virtual ISignalResponseMessage DeserializeResponseMessage(byte[] messageBytes) { // First, try to deserialize as Binary message to check the ResponseData format try { var binaryMsg = messageBytes.MessagePackTo(ContractlessStandardResolver.Options); if (binaryMsg.ResponseData != null && binaryMsg.ResponseData.Length > 0) { // Use the existing utility to detect if ResponseData is Binary format if (DetectSerializerTypeFromBytes(binaryMsg.ResponseData) == AcSerializerType.Binary) { return binaryMsg; } } } catch { // Failed to deserialize as Binary message } // Fall back to JSON format return messageBytes.MessagePackTo(ContractlessStandardResolver.Options); } } }