using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Core.Serializers.Jsons; using AyCode.Interfaces.Entities; using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.SignalR.Client; namespace AyCode.Services.SignalRs { public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; protected abstract Task MessageReceived(int messageTag, byte[] messageBytes); public int MsDelay = 25; public int MsFirstDelay = 50; public int ConnectionTimeout = 10000; public int TransportSendTimeout = 60000; private const string TagsName = "SignalRTags"; protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger) { Logger = logger; Logger.Detail(fullHubName); HubConnection = new HubConnectionBuilder() .WithUrl(fullHubName, HttpTransportType.WebSockets, options => { options.TransportMaxBufferSize = 30_000_000; options.ApplicationMaxBufferSize = 30_000_000; options.CloseTimeout = TimeSpan.FromSeconds(10); options.SkipNegotiation = true; }) .WithAutomaticReconnect() .WithStatefulReconnect() .WithKeepAliveInterval(TimeSpan.FromSeconds(60)) .WithServerTimeout(TimeSpan.FromSeconds(180)) .Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); } protected AcSignalRClientBase(AcLoggerBase logger) { Logger = logger; HubConnection = null; } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); ClearPendingRequests(); return Task.CompletedTask; } #region Connection State Methods protected virtual HubConnectionState GetConnectionState() => HubConnection?.State ?? HubConnectionState.Disconnected; protected virtual bool IsConnected() => GetConnectionState() == HubConnectionState.Connected; protected virtual Task StartConnectionInternal() => HubConnection?.StartAsync() ?? Task.CompletedTask; protected virtual Task StopConnectionInternal() => HubConnection?.StopAsync() ?? Task.CompletedTask; protected virtual ValueTask DisposeConnectionInternal() => HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask; protected virtual Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId) => HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, messageBytes, requestId) ?? Task.CompletedTask; #endregion #region Protected Test Helpers protected ConcurrentDictionary GetPendingRequests() => _responseByRequestId; protected void ClearPendingRequests() => _responseByRequestId.Clear(); protected void RegisterPendingRequest(int requestId, SignalRRequestModel model) => _responseByRequestId[requestId] = model; #endregion public async Task StartConnection() { if (GetConnectionState() == HubConnectionState.Disconnected) await StartConnectionInternal(); if (!IsConnected()) await TaskHelper.WaitToAsync(IsConnected, ConnectionTimeout, 10, 25); } public async Task StopConnection() { await StopConnectionInternal(); await DisposeConnectionInternal(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, null, GetNextRequestId()); public virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int? requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}"); await StartConnection(); var msgBytes = message != null ? SignalRSerializationHelper.SerializeToBinary(message) : null; if (!IsConnected()) { Logger.Error($"Client SendMessageToServerAsync error! ConnectionState: {GetConnectionState()};"); return; } await SendToHubAsync(messageTag, msgBytes, requestId); } #region CRUD public virtual Task PostAsync(int messageTag, object parameter) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(parameter)), GetNextRequestId()); public virtual Task PostAsync(int messageTag, object[] parameters) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(parameters)), GetNextRequestId()); public virtual Task GetByIdAsync(int messageTag, object id) => PostAsync(messageTag, id); public virtual Task GetByIdAsync(int messageTag, object[] ids) => PostAsync(messageTag, ids); /// /// Gets data by ID with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object id) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(id)), responseCallback); /// /// Gets data by IDs with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object[] ids) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), responseCallback); public virtual Task GetAllAsync(int messageTag) => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)), GetNextRequestId()); /// /// Gets all data with async callback response. Callback is second parameter. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); /// /// Gets all data with context params and async callback response. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)), responseCallback); public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData) => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), GetNextRequestId()); /// /// Posts data with async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback); /// /// Posts data with typed async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback); /// /// Posts data and invokes callback with response. Fire-and-forget friendly for background saves. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Action responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; return SendMessageToServerAsync(messageTag, CreatePostMessage(postData), requestId); } private static ISignalRMessage CreatePostMessage(TPostData postData) { var type = typeof(TPostData); if (type == typeof(string) || type.IsEnum || type.IsValueType) return new SignalPostJsonDataMessage(new IdMessage(postData!)); return new SignalPostJsonDataMessage(postData); } public Task GetAllIntoAsync(List intoList, int messageTag, object[]? contextParams = null, Action? callback = null) where TResponseItem : IEntityGuid { return GetAllAsync>(messageTag, contextParams).ContinueWith(task => { var logText = $"GetAllIntoAsync<{typeof(TResponseItem).Name}>(); dataCount: {task.Result?.Count}; {ConstHelper.NameByValue(TagsName, messageTag)};"; intoList.Clear(); if (task.Result != null) { Logger.Debug(logText); intoList.AddRange(task.Result); } else Logger.Error(logText); callback?.Invoke(); }, TaskScheduler.Default); } #endregion public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, null, GetNextRequestId()); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message) => SendMessageToServerAsync(messageTag, message, GetNextRequestId()); /// /// Sends message to server with async callback response. /// public virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, Func responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, message, requestId); } protected virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"); var startTime = DateTime.Now; var requestModel = SignalRRequestModelPool.Get(); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, message, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, TransportSendTimeout, MsDelay, MsFirstDelay) && _responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is SignalResponseDataMessage responseMessage) { startTime = obj.RequestDateTime; SignalRRequestModelPool.Return(obj); if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}"; Logger.Error(errorText); return await Task.FromException(new Exception(errorText)); } // Special case: when TResponse is SignalResponseDataMessage, return the message itself // instead of trying to deserialize ResponseData (which would cause InvalidCastException) if (typeof(TResponse) == typeof(SignalResponseDataMessage)) { var serializerType = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client returning raw SignalResponseDataMessage ({serializerType}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return (TResponse)(object)responseMessage; } var responseData = responseMessage.GetResponseData(); if (responseData == null && responseMessage.Status == SignalResponseStatus.Success) { Logger.Info($"Client received null response. Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return default; } var serializerType2 = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client deserialized response ({serializerType2}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return responseData; } Logger.Error($"Client timeout after: {(DateTime.Now - startTime).TotalSeconds} sec! ConnectionState: {GetConnectionState()}; requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); } catch (Exception ex) { Logger.Error($"Client SendMessageToServerAsync; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); } if (_responseByRequestId.TryRemove(requestId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return default; } protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32; public virtual Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"; if (messageBytes.Length == 0) Logger.Warning($"message.Length == 0! {logText}"); try { if (requestId.HasValue && _responseByRequestId.TryGetValue(requestId.Value, out var requestModel)) { var reqId = requestId.Value; requestModel.ResponseDateTime = DateTime.UtcNow; Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{messageBytes.Length / 1024}kb]{logText}"); var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(messageBytes) ?? new SignalResponseDataMessage(); switch (requestModel.ResponseByRequestId) { case null: requestModel.ResponseByRequestId = responseMessage; return Task.CompletedTask; case Action actionCallback: if (_responseByRequestId.TryRemove(reqId, out var actionModel)) SignalRRequestModelPool.Return(actionModel); actionCallback.Invoke(responseMessage); return Task.CompletedTask; case Func funcCallback: if (_responseByRequestId.TryRemove(reqId, out var funcModel)) SignalRRequestModelPool.Return(funcModel); return funcCallback.Invoke(responseMessage); default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {requestModel.ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(TagsName, messageTag)}"); break; } if (_responseByRequestId.TryRemove(reqId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return Task.CompletedTask; } Logger.Info(logText); MessageReceived(messageTag, messageBytes).Forget(); } catch (Exception ex) { if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel)) SignalRRequestModelPool.Return(exModel); Logger.Error($"Client OnReceiveMessage; ConnectionState: {GetConnectionState()}; requestId: {requestId}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); throw; } return Task.CompletedTask; } } }