using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Services.Loggers; using AyCode.Services.SignalRs; using MessagePack.Resolvers; using Microsoft.AspNetCore.SignalR.Client; namespace AyCode.Blazor.Components.Services { public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); protected readonly HubConnection HubConnection; protected readonly AcLoggerBase Logger; public event Action OnMessageReceived = null!; //public event Action OnMessageRequested; public int Timeout = 10000; protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger) { Logger = logger; HubConnection = new HubConnectionBuilder() .WithUrl(fullHubName) //.AddMessagePackProtocol(options => { // options.SerializerOptions = MessagePackSerializerOptions.Standard // .WithResolver(MessagePack.Resolvers.StandardResolver.Instance) // .WithSecurity(MessagePackSecurity.UntrustedData) // .WithCompression(MessagePackCompression.Lz4Block) // .WithCompressionMinLength(256);}) .Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); //_ = HubConnection.On(nameof(IAcSignalRHubClient.OnRequestMessage), OnRequestMessage); HubConnection.StartAsync().Forget(); } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional($"Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); _responseByRequestId.Clear(); return Task.CompletedTask; } public async Task StartConnection() { if (HubConnection.State == HubConnectionState.Disconnected) await HubConnection.StartAsync(); if (HubConnection.State != HubConnectionState.Connected) await TaskHelper.WaitToAsync(() => HubConnection.State == HubConnectionState.Connected, Timeout, 25); } public async Task StopConnection() { await HubConnection.StopAsync(); await HubConnection.DisposeAsync(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, null, AcDomain.NextUniqueInt32); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int? requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); return StartConnection().ContinueWith(x => { var msgp = message?.ToMessagePack(ContractlessStandardResolver.Options); return HubConnection.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, msgp, requestId); }); } #region CRUD public virtual Task GetByIdAsync(int messageTag, params Guid[] ids) where TResponseData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), AcDomain.NextUniqueInt32); public virtual Task GetByIdAsync(int messageTag, Func, Task> responseCallback, params Guid[] ids) => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(new IdMessage(ids)), responseCallback); public virtual Task GetAllAsync(int messageTag) where TResponseData : class => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task GetAllAsync(int messageTag, Func, Task> responseCallback, params Guid[]? contextIds) => SendMessageToServerAsync(messageTag, (contextIds == null || contextIds.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextIds))), responseCallback); public virtual Task GetAllAsync(int messageTag, params Guid[]? contextIds) where TResponseData : class => SendMessageToServerAsync(messageTag, contextIds == null || contextIds.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextIds)), AcDomain.NextUniqueInt32); public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), AcDomain.NextUniqueInt32); public virtual Task PostDataAsync(int messageTag, TPostData postData, Func, Task> responseCallback) where TPostData : class => SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage(postData), responseCallback); #endregion CRUD public virtual Task SendMessageToServerAsync(int messageTag) where TResponse : class => SendMessageToServerAsync(messageTag, null, AcDomain.NextUniqueInt32); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message) where TResponse : class => SendMessageToServerAsync(messageTag, message, AcDomain.NextUniqueInt32); protected virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int requestId) where TResponse : class { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); _responseByRequestId[requestId] = null; await SendMessageToServerAsync(messageTag, message, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId] != null, Timeout, 25) && _responseByRequestId.TryRemove(requestId, out var obj) && obj is ISignalResponseMessage responseMessage) { if (responseMessage.Status == SignalResponseStatus.Error || responseMessage.ResponseData == null) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; requestId: {requestId};"; Logger.Error(errorText); //TODO: Ideiglenes, majd a ResponseMessage-et kell visszaadni a Status miatt! - J. throw new Exception(errorText); //return default; } return responseMessage.ResponseData.JsonTo(); } } catch (Exception ex) { Logger.Error($"SendMessageToServerAsync; messageTag: {messageTag}; requestId: {requestId}; {ex.Message}", ex); } _responseByRequestId.TryRemove(requestId, out _); return default; } public virtual Task SendMessageToServerAsync(int messageTag, Func, Task> responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); public virtual Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, Func, Task> responseCallback) { if (messageTag == 0) Logger.Error($"SendMessageToServerAsync; messageTag == 0"); var requestId = AcDomain.NextUniqueInt32; _responseByRequestId[requestId] = new Action>(responseMessage => { TResponseData? responseData = default; if (responseMessage.Status == SignalResponseStatus.Success) { responseData = string.IsNullOrEmpty(responseMessage.ResponseData) ? default : responseMessage.ResponseData.JsonTo(); } else Logger.Error($"Client SendMessageToServerAsync response error; callback; tag: {messageTag}; Status: {responseMessage.Status}; requestId: {requestId};"); responseCallback(new SignalResponseMessage(responseMessage.Status, responseData)); }); return SendMessageToServerAsync(messageTag, message, requestId); } public virtual Task OnReceiveMessage(int messageTag, byte[] message, int? requestId) { var logText = $"Client OnReceiveMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"; if (message.Length == 0) Logger.Warning($"message.Length == 0! {logText}"); else Logger.Info(logText); try { if (requestId.HasValue && _responseByRequestId.ContainsKey(requestId.Value)) { var reqId = requestId.Value; var responseMessage = message.MessagePackTo(ContractlessStandardResolver.Options); switch (_responseByRequestId[reqId]) { case null: _responseByRequestId[reqId] = responseMessage; return Task.CompletedTask; case Action> messagePackCallback: _responseByRequestId.TryRemove(reqId, out _); messagePackCallback.Invoke(responseMessage); return Task.CompletedTask; //case Action jsonCallback: // _responseByRequestId.TryRemove(reqId, out _); // jsonCallback.Invoke(responseMessage); // return Task.CompletedTask; default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {_responseByRequestId[reqId]?.GetType().Name}"); break; } _responseByRequestId.TryRemove(reqId, out _); } OnMessageReceived(messageTag, message, requestId); } catch (Exception ex) { if (requestId.HasValue) _responseByRequestId.TryRemove(requestId.Value, out _); Logger.Error($"Client OnReceiveMessage; messageTag: {messageTag}; requestId: {requestId}; {ex.Message}", ex); throw; } return Task.CompletedTask; } //public virtual Task OnRequestMessage(int messageTag, int requestId) //{ // Logger.DebugConditional($"Client OnRequestMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); // try // { // OnMessageRequested(messageTag, requestId); // } // catch(Exception ex) // { // Logger.Error($"Client OnReceiveMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId}; {ex.Message}", ex); // throw; // } // return Task.CompletedTask; //} } }