using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Interfaces.Entities; using AyCode.Services.Loggers; using AyCode.Services.SignalRs; using MessagePack.Resolvers; using Microsoft.AspNetCore.SignalR.Client; namespace AyCode.Blazor.Components.Services { public class SignalRRequestModel { public DateTime RequestDateTime; public DateTime ResponseDateTime; public object? ResponseByRequestId = null; public SignalRRequestModel() { RequestDateTime = DateTime.UtcNow; } public SignalRRequestModel(object responseByRequestId) : this() { ResponseByRequestId = responseByRequestId; } } public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); protected readonly HubConnection HubConnection; protected readonly AcLoggerBase Logger; public event Action OnMessageReceived = null!; //public event Action OnMessageRequested; public int Timeout = 10000; private string _tagsName = "SignalRTags"; protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger) { Logger = logger; HubConnection = new HubConnectionBuilder() .WithUrl(fullHubName) //.AddMessagePackProtocol(options => { // options.SerializerOptions = MessagePackSerializerOptions.Standard // .WithResolver(MessagePack.Resolvers.StandardResolver.Instance) // .WithSecurity(MessagePackSecurity.UntrustedData) // .WithCompression(MessagePackCompression.Lz4Block) // .WithCompressionMinLength(256);}) .Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); //_ = HubConnection.On(nameof(IAcSignalRHubClient.OnRequestMessage), OnRequestMessage); HubConnection.StartAsync().Forget(); } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional($"Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); _responseByRequestId.Clear(); return Task.CompletedTask; } public async Task StartConnection() { if (HubConnection.State == HubConnectionState.Disconnected) await HubConnection.StartAsync(); if (HubConnection.State != HubConnectionState.Connected) await TaskHelper.WaitToAsync(() => HubConnection.State == HubConnectionState.Connected, Timeout, 25); } public async Task StopConnection() { await HubConnection.StopAsync(); await HubConnection.DisposeAsync(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, null, AcDomain.NextUniqueInt32); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int? requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(_tagsName, messageTag)}"); return StartConnection().ContinueWith(_ => { var msgp = message?.ToMessagePack(ContractlessStandardResolver.Options); return HubConnection.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, msgp, requestId); }); } #region CRUD public virtual Task GetByIdAsync(int messageTag, object id) where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(id)), AcDomain.NextUniqueInt32); public virtual Task GetByIdAsync(int messageTag, Func, Task> responseCallback, object id) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(id)), responseCallback); public virtual Task GetByIdAsync(int messageTag, object[] ids) where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), AcDomain.NextUniqueInt32); public virtual Task GetByIdAsync(int messageTag, Func, Task> responseCallback, object[] ids) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), responseCallback); public virtual Task GetAllAsync(int messageTag) where TResponseData : class => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, (contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams))), responseCallback); public virtual Task GetAllAsync(int messageTag, object[]? contextParams) where TResponseData : class => SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)), AcDomain.NextUniqueInt32); public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), AcDomain.NextUniqueInt32); public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), AcDomain.NextUniqueInt32); public virtual Task PostDataAsync(int messageTag, TPostData postData, Func, Task> responseCallback) where TPostData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), responseCallback); public virtual Task PostDataAsync(int messageTag, TPostData postData, Func, Task> responseCallback) where TPostData : class where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), responseCallback); public Task GetAllIntoAsync(List intoList, int messageTag, object[]? contextParams = null) where T : IEntityGuid { return GetAllAsync>(messageTag, response => { if (response.Status != SignalResponseStatus.Success || response.ResponseData == null) { Logger.Error($"GetAllIntoAsync<{typeof(T).Name}>(); status: {response.Status}; dataCount: {response.ResponseData?.Count}; {ConstHelper.NameByValue(_tagsName, messageTag)};"); return Task.CompletedTask; } intoList.Clear(); intoList.AddRange(response.ResponseData); return Task.CompletedTask; }, contextParams); } #endregion CRUD public virtual Task SendMessageToServerAsync(int messageTag) where TResponse : class => SendMessageToServerAsync(messageTag, null, AcDomain.NextUniqueInt32); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message) where TResponse : class => SendMessageToServerAsync(messageTag, message, AcDomain.NextUniqueInt32); protected virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int requestId) where TResponse : class { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(_tagsName, messageTag)}"); _responseByRequestId[requestId] = new SignalRRequestModel(); await SendMessageToServerAsync(messageTag, message, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, Timeout, 25) && _responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is ISignalResponseMessage responseMessage) { if (responseMessage.Status == SignalResponseStatus.Error || responseMessage.ResponseData == null) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; requestId: {requestId};"; Logger.Error(errorText); //TODO: Ideiglenes, majd a ResponseMessage-et kell visszaadni a Status miatt! - J. throw new Exception(errorText); //return default; } return responseMessage.ResponseData.JsonTo(); } } catch (Exception ex) { Logger.Error($"SendMessageToServerAsync; requestId: {requestId}; {ex.Message}; {ConstHelper.NameByValue(_tagsName, messageTag)}", ex); } _responseByRequestId.TryRemove(requestId, out _); return default; } public virtual Task SendMessageToServerAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, Func, Task> responseCallback) { if (messageTag == 0) Logger.Error($"SendMessageToServerAsync; messageTag == 0"); var requestId = AcDomain.NextUniqueInt32; _responseByRequestId[requestId] = new SignalRRequestModel(new Action>(responseMessage => { TResponseData? responseData = default; if (responseMessage.Status == SignalResponseStatus.Success) { responseData = string.IsNullOrEmpty(responseMessage.ResponseData) ? default : responseMessage.ResponseData.JsonTo(); } else Logger.Error($"Client SendMessageToServerAsync response error; callback; Status: {responseMessage.Status}; requestId: {requestId}; {ConstHelper.NameByValue(_tagsName, messageTag)}"); responseCallback(new SignalResponseMessage(messageTag, responseMessage.Status, responseData)); })); return SendMessageToServerAsync(messageTag, message, requestId); } public virtual Task OnReceiveMessage(int messageTag, byte[] message, int? requestId) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(_tagsName, messageTag)}"; if (message.Length == 0) Logger.Warning($"message.Length == 0! {logText}"); try { if (requestId.HasValue && _responseByRequestId.ContainsKey(requestId.Value)) { var reqId = requestId.Value; _responseByRequestId[reqId].ResponseDateTime = DateTime.UtcNow; Logger.Info($"[{_responseByRequestId[reqId].ResponseDateTime.Subtract(_responseByRequestId[reqId].RequestDateTime).TotalMilliseconds:N0}ms][{(message.Length/1024)}kb]{logText}"); var responseMessage = message.MessagePackTo(ContractlessStandardResolver.Options); switch (_responseByRequestId[reqId].ResponseByRequestId) { case null: _responseByRequestId[reqId].ResponseByRequestId = responseMessage; return Task.CompletedTask; case Action> messagePackCallback: _responseByRequestId.TryRemove(reqId, out _); messagePackCallback.Invoke(responseMessage); return Task.CompletedTask; //case Action jsonCallback: // _responseByRequestId.TryRemove(reqId, out _); // jsonCallback.Invoke(responseMessage); // return Task.CompletedTask; default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {_responseByRequestId[reqId].ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(_tagsName, messageTag)}"); break; } _responseByRequestId.TryRemove(reqId, out _); } else Logger.Info(logText); OnMessageReceived(messageTag, message, requestId); } catch (Exception ex) { if (requestId.HasValue) _responseByRequestId.TryRemove(requestId.Value, out _); Logger.Error($"Client OnReceiveMessage; requestId: {requestId}; {ex.Message}; {ConstHelper.NameByValue(_tagsName, messageTag)}", ex); throw; } return Task.CompletedTask; } //public virtual Task OnRequestMessage(int messageTag, int requestId) //{ // Logger.DebugConditional($"Client OnRequestMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); // try // { // OnMessageRequested(messageTag, requestId); // } // catch(Exception ex) // { // Logger.Error($"Client OnReceiveMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId}; {ex.Message}", ex); // throw; // } // return Task.CompletedTask; //} } }