using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Enums; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Entities.LogItems; using AyCode.Services.Loggers; using AyCode.Services.SignalRs; using Azure.Core; using MessagePack; using MessagePack.Resolvers; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; using TIAM.Entities.Transfers; using TIAMWebApp.Shared.Application.Models.ClientSide; using TIAMWebApp.Shared.Application.Utility; namespace TIAMWebApp.Shared.Application.Services { public class DevAdminSignalClient(IEnumerable logWriters) : AcSignalRClientBase("DevAdminHub", logWriters); public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); protected readonly HubConnection HubConnection; protected readonly LoggerClient Logger; public event Action OnMessageReceived; public event Action OnMessageRequested; protected AcSignalRClientBase(string hubName, IEnumerable logWriters) { Logger = new LoggerClient(GetType().Name, logWriters.ToArray()); HubConnection = new HubConnectionBuilder() .WithUrl($"{Setting.BaseUrl}/{hubName}") //.AddMessagePackProtocol(options => options.SerializerOptions = MessagePackSerializerOptions.Standard.WithSecurity(MessagePackSecurity.UntrustedData)) .Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On("OnReceiveMessage", OnReceiveMessage); _ = HubConnection.On(nameof(IAcSignalRHubClient.OnRequestMessage), OnRequestMessage); HubConnection.StartAsync().Forget(); } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional($"Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); _responseByRequestId.Clear(); return Task.CompletedTask; } public async Task StartConnection() { if (HubConnection.State == HubConnectionState.Disconnected) await HubConnection.StartAsync(); if (HubConnection.State != HubConnectionState.Connected) await TaskHelper.WaitToAsync(() => HubConnection.State == HubConnectionState.Connected, 10000, 25); } public async Task StopConnection() { await HubConnection.StopAsync(); await HubConnection.DisposeAsync(); } public virtual async Task SendMessageToServerAsync(int messageTag, object message, int? requestId = null) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); await StartConnection(); HubConnection.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, message, requestId).Forget(); } public virtual void SendRequestToServerAsync(int messageTag) => SendRequestToServerAsync(messageTag, AcDomain.NextUniqueInt32).Forget(); public virtual async Task SendRequestToServerAsync(int messageTag, int requestId) { Logger.DebugConditional($"Client SendRequestToServerAsync; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); await StartConnection(); HubConnection.SendAsync(nameof(IAcSignalRHubClient.OnRequestMessage), messageTag, requestId).Forget(); } public virtual async Task SendRequestToServerAsync(int messageTag) { var requestId = AcDomain.NextUniqueInt32; Logger.DebugConditional($"Client SendRequestToServerAsync; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); _responseByRequestId[requestId] = null; await SendRequestToServerAsync(messageTag, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId] != null, 10000, 25) && _responseByRequestId.TryRemove(requestId, out var obj) && obj is byte[] messagePackBytes) { var json = messagePackBytes.MessagePackTo(ContractlessStandardResolver.Options); return json.JsonTo() ?? default; } } catch (Exception ex) { Logger.Error($"SendRequestToServerAsync; messageTag: {messageTag}; requestId: {requestId}; {ex.Message}", ex); } _responseByRequestId.TryRemove(requestId, out _); return default; } public virtual Task SendRequestToServerAsync(int messageTag, Action responseCallback) { var requestId = AcDomain.NextUniqueInt32; _responseByRequestId[requestId] = responseCallback; return SendRequestToServerAsync(messageTag, requestId); } public virtual Task OnReceiveMessage(int messageTag, byte[] message, int? requestId) { var logText = $"Client OnReceiveMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"; if (message.Length == 0) Logger.Warning($"message.Length == 0! {logText}"); else Logger.Info(logText); try { if (requestId.HasValue && _responseByRequestId.ContainsKey(requestId.Value)) { var reqId = requestId.Value; switch (_responseByRequestId[reqId]) { case null: _responseByRequestId[reqId] = message; return Task.CompletedTask; case Action messagePackCallback: _responseByRequestId.TryRemove(reqId, out _); messagePackCallback.Invoke(message); return Task.CompletedTask; //case Action jsonCallback: // jsonCallback.Invoke(message.MessagePackTo()); // return Task.CompletedTask; default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {_responseByRequestId[reqId]?.GetType().Name}"); break; } _responseByRequestId.TryRemove(reqId, out _); } OnMessageReceived(messageTag, message, requestId); } catch(Exception ex) { Logger.Error($"Client OnReceiveMessage; messageTag: {messageTag}; requestId: {requestId}; {ex.Message}", ex); throw; } return Task.CompletedTask; } public virtual Task OnRequestMessage(int messageTag, int requestId) { Logger.DebugConditional($"Client OnRequestMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId};"); try { OnMessageRequested(messageTag, requestId); } catch(Exception ex) { Logger.Error($"Client OnReceiveMessage; {nameof(messageTag)}: {messageTag}; {nameof(requestId)}: {requestId}; {ex.Message}", ex); throw; } return Task.CompletedTask; } } }