using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Core.Serializers; using AyCode.Core.Serializers.Binaries; using AyCode.Interfaces.Entities; using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs { public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); private readonly bool _useAcBinaryProtocol; protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; /// /// Enable diagnostic logging for binary serialization debugging. /// public static bool EnableBinaryDiagnostics { get; set; } protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, object data); public int MsDelay = 25; public int MsFirstDelay = 50; public int ConnectionTimeout = 10000; public int TransportSendTimeout = 60000; private const string TagsName = "SignalRTags"; protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger, bool useAcBinaryProtocol = true) { _useAcBinaryProtocol = useAcBinaryProtocol; Logger = logger; Logger.Detail(fullHubName); var hubBuilder = new HubConnectionBuilder() .WithUrl(fullHubName, HttpTransportType.WebSockets, options => { options.TransportMaxBufferSize = 30_000_000; options.ApplicationMaxBufferSize = 30_000_000; options.CloseTimeout = TimeSpan.FromSeconds(10); options.SkipNegotiation = true; }) .ConfigureLogging(logging => { // alap minimális MS log level logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Information); // regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt logging.AddAcLogger(_ => Logger); // ha inkább csak AcLogger legyen: // logging.ClearProviders(); // logging.AddProvider(new AcLoggerProvider(category => Logger)); }) .WithAutomaticReconnect() .WithStatefulReconnect() .WithKeepAliveInterval(TimeSpan.FromSeconds(60)) .WithServerTimeout(TimeSpan.FromSeconds(180)); if (useAcBinaryProtocol) { hubBuilder.Services.AddSingleton(sp => { var binaryOptions = AcBinarySerializerOptions.Default; binaryOptions.BufferWriterChunkSize = 4096; // AcSignalRClientBase — a 84. sor környékén: var signalLogger = sp.GetRequiredService().CreateLogger(); return new AyCodeBinaryHubProtocol(binaryOptions, BinaryProtocolMode.AsyncSegment, signalLogger); // és törölhető: AcBinaryHubProtocol.DiagnosticLogger = msg => Logger.Debug(msg); }); //Vagy ha az options-t is DI-ből: //hubBuilder.Services.AddSingleton(sp => new AyCodeBinaryHubProtocol(sp.GetRequiredService())); AcBinaryHubProtocol.DiagnosticLogger = msg => Logger.Debug(msg); AcBinaryDeserializer.DiagnosticLogger = msg => Logger.Debug(msg); } HubConnection = hubBuilder.Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); } protected AcSignalRClientBase(AcLoggerBase logger) { Logger = logger; HubConnection = null; } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); ClearPendingRequests(); return Task.CompletedTask; } #region Connection State Methods protected virtual HubConnectionState GetConnectionState() => HubConnection?.State ?? HubConnectionState.Disconnected; protected virtual bool IsConnected() => GetConnectionState() == HubConnectionState.Connected; protected virtual Task StartConnectionInternal() => HubConnection?.StartAsync() ?? Task.CompletedTask; protected virtual Task StopConnectionInternal() => HubConnection?.StopAsync() ?? Task.CompletedTask; protected virtual ValueTask DisposeConnectionInternal() => HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask; protected virtual Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, object? data) => HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, requestId, signalParams, data) ?? Task.CompletedTask; #endregion #region Protected Test Helpers protected ConcurrentDictionary GetPendingRequests() => _responseByRequestId; protected void ClearPendingRequests() => _responseByRequestId.Clear(); protected void RegisterPendingRequest(int requestId, SignalRRequestModel model) => _responseByRequestId[requestId] = model; #endregion public async Task StartConnection() { if (GetConnectionState() == HubConnectionState.Disconnected) await StartConnectionInternal(); if (!IsConnected()) await TaskHelper.WaitToAsync(IsConnected, ConnectionTimeout, 10, 25); } public async Task StopConnection() { await StopConnectionInternal(); await DisposeConnectionInternal(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId()); public virtual Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId) => SendCoreAsync(messageTag, parameters, requestId, new SignalParams { Status = SignalResponseStatus.Success }); /// /// Core send: takes a pre-built SignalParams (caller controls IsRawBytesData etc.) /// protected async Task SendCoreAsync(int messageTag, object[]? parameters, int? requestId, SignalParams signalParams) { Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}"); await StartConnection(); if (!IsConnected()) { Logger.Error($"Client SendMessageToServerAsync error! ConnectionState: {GetConnectionState()};"); return; } if (parameters is { Length: > 0 }) signalParams.SetParameterValues(parameters); await SendToHubAsync(messageTag, requestId, signalParams, null); } #region CRUD public virtual Task PostAsync(int messageTag, object parameter) => SendMessageToServerAsync(messageTag, [parameter], GetNextRequestId()); public virtual Task PostAsync(int messageTag, object[] parameters) => SendMessageToServerAsync(messageTag, parameters, GetNextRequestId()); public virtual Task GetByIdAsync(int messageTag, object id) => PostAsync(messageTag, id); public virtual Task GetByIdAsync(int messageTag, object[] ids) => PostAsync(messageTag, ids); /// /// Gets data by ID with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object id) => SendMessageToServerAsync(messageTag, [id], responseCallback); /// /// Gets data by IDs with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object[] ids) => SendMessageToServerAsync(messageTag, ids, responseCallback); public virtual Task GetAllAsync(int messageTag) => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams is { Length: > 0 } ? contextParams : null, GetNextRequestId()); /// /// Gets all data with async callback response. Callback is second parameter. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); /// /// Gets all data with context params and async callback response. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams is { Length: > 0 } ? contextParams : null, responseCallback); public virtual async IAsyncEnumerable StreamAllAsync(int messageTag, object[]? contextParams = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { await StartConnection(); if (HubConnection == null || !IsConnected()) { Logger.Error($"Client StreamAllAsync error! ConnectionState: {GetConnectionState()};"); yield break; } var msgBytes = contextParams is { Length: > 0 } ? SignalRSerializationHelper.SerializeToBinary( contextParams.Select(p => SignalRSerializationHelper.SerializeToBinary(p)).ToArray()) : null; var stream = HubConnection.StreamAsync( "OnReceiveStreamMessage", messageTag, msgBytes, cancellationToken); await foreach (var bytes in stream.WithCancellation(cancellationToken)) { if (bytes == null) continue; if (typeof(TResponseData) == typeof(byte[])) { yield return (TResponseData)(object)bytes; continue; } var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); if (responseMessage != null) { if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client StreamAllAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; Logger.Error(errorText); throw new Exception(errorText); } yield return responseMessage.GetResponseData(); } } } public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, [postData!], GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData) => SendMessageToServerAsync(messageTag, [postData!], GetNextRequestId()); /// /// Posts data with async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, [postData!], responseCallback); /// /// Posts data with typed async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, [postData!], responseCallback); /// /// Posts data and invokes callback with response. Fire-and-forget friendly for background saves. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Action responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; return SendMessageToServerAsync(messageTag, [postData!], requestId); } public virtual async IAsyncEnumerable StreamPostDataAsync(int messageTag, TPostData postData, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { await StartConnection(); if (HubConnection == null || !IsConnected()) { Logger.Error($"Client StreamPostDataAsync error! ConnectionState: {GetConnectionState()};"); yield break; } var msgBytes = SignalRSerializationHelper.SerializeToBinary( new[] { SignalRSerializationHelper.SerializeToBinary(postData!) }); var stream = HubConnection.StreamAsync( "OnReceiveStreamMessage", messageTag, msgBytes, cancellationToken); await foreach (var bytes in stream.WithCancellation(cancellationToken)) { if (bytes == null) continue; if (typeof(TResponseData) == typeof(byte[])) { yield return (TResponseData)(object)bytes; continue; } var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); if (responseMessage != null) { if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client StreamPostDataAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; Logger.Error(errorText); throw new Exception(errorText); } yield return responseMessage.GetResponseData(); } } } public Task GetAllIntoAsync(List intoList, int messageTag, object[]? contextParams = null, Action? callback = null) where TResponseItem : IEntityGuid { return GetAllAsync>(messageTag, contextParams).ContinueWith(task => { var logText = $"GetAllIntoAsync<{typeof(TResponseItem).Name}>(); dataCount: {task.Result?.Count}; {ConstHelper.NameByValue(TagsName, messageTag)};"; intoList.Clear(); if (task.Result != null) { Logger.Debug(logText); intoList.AddRange(task.Result); } else Logger.Error(logText); callback?.Invoke(); }, TaskScheduler.Default); } #endregion public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId()); public virtual Task SendMessageToServerAsync(int messageTag, object[]? parameters) => SendMessageToServerAsync(messageTag, parameters, GetNextRequestId()); /// /// Sends message to server with async callback response. /// public virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, Func responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, parameters, requestId); } protected virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, int requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"); var startTime = DateTime.Now; var requestModel = SignalRRequestModelPool.Get(); _responseByRequestId[requestId] = requestModel; await SendCoreAsync(messageTag, parameters, requestId, new SignalParams { Status = SignalResponseStatus.Success, IsRawBytesData = typeof(TResponse) == typeof(byte[]) }); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, TransportSendTimeout, MsDelay, MsFirstDelay) && _responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is SignalResponseDataMessage responseMessage) { startTime = obj.RequestDateTime; SignalRRequestModelPool.Return(obj); if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}"; Logger.Error(errorText); return await Task.FromException(new Exception(errorText)); } // Special case: when TResponse is SignalResponseDataMessage, return the message itself // instead of trying to deserialize ResponseData (which would cause InvalidCastException) if (typeof(TResponse) == typeof(SignalResponseDataMessage)) { var serializerType = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client returning raw SignalResponseDataMessage ({serializerType}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return (TResponse)(object)responseMessage; } var responseData = responseMessage.GetResponseData(); if (responseData == null && responseMessage.Status == SignalResponseStatus.Success) { Logger.Info($"Client received null response. Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return default; } var serializerType2 = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client deserialized response ({serializerType2}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return responseData; } Logger.Error($"Client timeout after: {(DateTime.Now - startTime).TotalSeconds} sec! ConnectionState: {GetConnectionState()}; requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); } catch (Exception ex) { Logger.Error($"Client SendMessageToServerAsync; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); } if (_responseByRequestId.TryRemove(requestId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return default; } protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32; public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"; try { if (requestId.HasValue && _responseByRequestId.TryGetValue(requestId.Value, out var requestModel)) { var reqId = requestId.Value; requestModel.ResponseDateTime = DateTime.UtcNow; Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms]{logText}"); // Protocol already deserialized data to typed object or byte[] var responseMessage = new SignalResponseDataMessage { Status = signalParams.Status, DataSerializerType = signalParams.DataSerializerType, RawResponseData = data }; switch (requestModel.ResponseByRequestId) { case null: requestModel.ResponseByRequestId = responseMessage; return Task.CompletedTask; case Action actionCallback: if (_responseByRequestId.TryRemove(reqId, out var actionModel)) SignalRRequestModelPool.Return(actionModel); actionCallback.Invoke(responseMessage); return Task.CompletedTask; case Func funcCallback: if (_responseByRequestId.TryRemove(reqId, out var funcModel)) SignalRRequestModelPool.Return(funcModel); return funcCallback.Invoke(responseMessage); default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {requestModel.ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(TagsName, messageTag)}"); break; } if (_responseByRequestId.TryRemove(reqId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return Task.CompletedTask; } Logger.Info(logText); MessageReceived(messageTag, signalParams, data).Forget(); } catch (Exception ex) { if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel)) SignalRRequestModelPool.Return(exModel); Logger.Error($"Client OnReceiveMessage; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); throw; } return Task.CompletedTask; } } }