using System.Collections.Concurrent; using AyCode.Core; using AyCode.Core.Extensions; using AyCode.Core.Helpers; using AyCode.Core.Loggers; using AyCode.Core.Serializers; using AyCode.Interfaces.Entities; using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace AyCode.Services.SignalRs { public abstract class AcSignalRClientBase : IAcSignalRHubClient { private readonly ConcurrentDictionary _responseByRequestId = new(); private readonly bool _useAcBinaryProtocol; protected readonly HubConnection? HubConnection; protected readonly AcLoggerBase Logger; protected abstract Task MessageReceived(int messageTag, SignalParams signalParams, byte[] data); public int MsDelay = 25; public int MsFirstDelay = 50; public int ConnectionTimeout = 10000; public int TransportSendTimeout = 60000; private const string TagsName = "SignalRTags"; protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger, bool useAcBinaryProtocol = true) { _useAcBinaryProtocol = useAcBinaryProtocol; Logger = logger; Logger.Detail(fullHubName); var hubBuilder = new HubConnectionBuilder() .WithUrl(fullHubName, HttpTransportType.WebSockets, options => { options.TransportMaxBufferSize = 30_000_000; options.ApplicationMaxBufferSize = 30_000_000; options.CloseTimeout = TimeSpan.FromSeconds(10); options.SkipNegotiation = true; }) .ConfigureLogging(logging => { // alap minimális MS log level logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Error); // regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt logging.AddAcLogger(_ => Logger); // ha inkább csak AcLogger legyen: // logging.ClearProviders(); // logging.AddProvider(new AcLoggerProvider(category => Logger)); }) .WithAutomaticReconnect() .WithStatefulReconnect() .WithKeepAliveInterval(TimeSpan.FromSeconds(60)) .WithServerTimeout(TimeSpan.FromSeconds(180)); if (useAcBinaryProtocol) { hubBuilder.Services.AddSingleton(); } HubConnection = hubBuilder.Build(); HubConnection.Closed += HubConnection_Closed; _ = HubConnection.On(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage); } protected AcSignalRClientBase(AcLoggerBase logger) { Logger = logger; HubConnection = null; } private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed"); else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}"); ClearPendingRequests(); return Task.CompletedTask; } #region Connection State Methods protected virtual HubConnectionState GetConnectionState() => HubConnection?.State ?? HubConnectionState.Disconnected; protected virtual bool IsConnected() => GetConnectionState() == HubConnectionState.Connected; protected virtual Task StartConnectionInternal() => HubConnection?.StartAsync() ?? Task.CompletedTask; protected virtual Task StopConnectionInternal() => HubConnection?.StopAsync() ?? Task.CompletedTask; protected virtual ValueTask DisposeConnectionInternal() => HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask; protected virtual Task SendToHubAsync(int messageTag, int? requestId, SignalParams signalParams, byte[]? messageBytes) => HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, requestId, signalParams, messageBytes) ?? Task.CompletedTask; #endregion #region Protected Test Helpers protected ConcurrentDictionary GetPendingRequests() => _responseByRequestId; protected void ClearPendingRequests() => _responseByRequestId.Clear(); protected void RegisterPendingRequest(int requestId, SignalRRequestModel model) => _responseByRequestId[requestId] = model; #endregion public async Task StartConnection() { if (GetConnectionState() == HubConnectionState.Disconnected) await StartConnectionInternal(); if (!IsConnected()) await TaskHelper.WaitToAsync(IsConnected, ConnectionTimeout, 10, 25); } public async Task StopConnection() { await StopConnectionInternal(); await DisposeConnectionInternal(); } public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId()); public virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, int? requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}"); await StartConnection(); if (!IsConnected()) { Logger.Error($"Client SendMessageToServerAsync error! ConnectionState: {GetConnectionState()};"); return; } var signalParams = new SignalParams { Status = SignalResponseStatus.Success }; if (parameters is { Length: > 0 }) signalParams.SetParameterValues(parameters); await SendToHubAsync(messageTag, requestId, signalParams, null); } #region CRUD public virtual Task PostAsync(int messageTag, object parameter) => SendMessageToServerAsync(messageTag, [parameter], GetNextRequestId()); public virtual Task PostAsync(int messageTag, object[] parameters) => SendMessageToServerAsync(messageTag, parameters, GetNextRequestId()); public virtual Task GetByIdAsync(int messageTag, object id) => PostAsync(messageTag, id); public virtual Task GetByIdAsync(int messageTag, object[] ids) => PostAsync(messageTag, ids); /// /// Gets data by ID with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object id) => SendMessageToServerAsync(messageTag, [id], responseCallback); /// /// Gets data by IDs with async callback response. Callback is second parameter. /// public virtual Task GetByIdAsync(int messageTag, Func responseCallback, object[] ids) => SendMessageToServerAsync(messageTag, ids, responseCallback); public virtual Task GetAllAsync(int messageTag) => SendMessageToServerAsync(messageTag); public virtual Task GetAllAsync(int messageTag, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams is { Length: > 0 } ? contextParams : null, GetNextRequestId()); /// /// Gets all data with async callback response. Callback is second parameter. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback) => SendMessageToServerAsync(messageTag, null, responseCallback); /// /// Gets all data with context params and async callback response. /// public virtual Task GetAllAsync(int messageTag, Func responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams is { Length: > 0 } ? contextParams : null, responseCallback); public virtual async IAsyncEnumerable StreamAllAsync(int messageTag, object[]? contextParams = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { await StartConnection(); if (HubConnection == null || !IsConnected()) { Logger.Error($"Client StreamAllAsync error! ConnectionState: {GetConnectionState()};"); yield break; } var msgBytes = contextParams is { Length: > 0 } ? SignalRSerializationHelper.SerializeToBinary( contextParams.Select(p => SignalRSerializationHelper.SerializeToBinary(p)).ToArray()) : null; var stream = HubConnection.StreamAsync( "OnReceiveStreamMessage", messageTag, msgBytes, cancellationToken); await foreach (var bytes in stream.WithCancellation(cancellationToken)) { if (bytes == null) continue; if (typeof(TResponseData) == typeof(byte[])) { yield return (TResponseData)(object)bytes; continue; } var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); if (responseMessage != null) { if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client StreamAllAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; Logger.Error(errorText); throw new Exception(errorText); } yield return responseMessage.GetResponseData(); } } } public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, [postData!], GetNextRequestId()); public virtual Task PostDataAsync(int messageTag, TPostData postData) => SendMessageToServerAsync(messageTag, [postData!], GetNextRequestId()); /// /// Posts data with async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, [postData!], responseCallback); /// /// Posts data with typed async callback response. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Func responseCallback) => SendMessageToServerAsync(messageTag, [postData!], responseCallback); /// /// Posts data and invokes callback with response. Fire-and-forget friendly for background saves. /// public virtual Task PostDataAsync(int messageTag, TPostData postData, Action responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; return SendMessageToServerAsync(messageTag, [postData!], requestId); } public virtual async IAsyncEnumerable StreamPostDataAsync(int messageTag, TPostData postData, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { await StartConnection(); if (HubConnection == null || !IsConnected()) { Logger.Error($"Client StreamPostDataAsync error! ConnectionState: {GetConnectionState()};"); yield break; } var msgBytes = SignalRSerializationHelper.SerializeToBinary( new[] { SignalRSerializationHelper.SerializeToBinary(postData!) }); var stream = HubConnection.StreamAsync( "OnReceiveStreamMessage", messageTag, msgBytes, cancellationToken); await foreach (var bytes in stream.WithCancellation(cancellationToken)) { if (bytes == null) continue; if (typeof(TResponseData) == typeof(byte[])) { yield return (TResponseData)(object)bytes; continue; } var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); if (responseMessage != null) { if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client StreamPostDataAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; Logger.Error(errorText); throw new Exception(errorText); } yield return responseMessage.GetResponseData(); } } } public Task GetAllIntoAsync(List intoList, int messageTag, object[]? contextParams = null, Action? callback = null) where TResponseItem : IEntityGuid { return GetAllAsync>(messageTag, contextParams).ContinueWith(task => { var logText = $"GetAllIntoAsync<{typeof(TResponseItem).Name}>(); dataCount: {task.Result?.Count}; {ConstHelper.NameByValue(TagsName, messageTag)};"; intoList.Clear(); if (task.Result != null) { Logger.Debug(logText); intoList.AddRange(task.Result); } else Logger.Error(logText); callback?.Invoke(); }, TaskScheduler.Default); } #endregion public virtual Task SendMessageToServerAsync(int messageTag) => SendMessageToServerAsync(messageTag, (object[]?)null, GetNextRequestId()); public virtual Task SendMessageToServerAsync(int messageTag, object[]? parameters) => SendMessageToServerAsync(messageTag, parameters, GetNextRequestId()); /// /// Sends message to server with async callback response. /// public virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, Func responseCallback) { var requestId = GetNextRequestId(); var requestModel = SignalRRequestModelPool.Get(responseCallback); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, parameters, requestId); } protected virtual async Task SendMessageToServerAsync(int messageTag, object[]? parameters, int requestId) { Logger.DebugConditional($"Client SendMessageToServerAsync; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"); var startTime = DateTime.Now; var requestModel = SignalRRequestModelPool.Get(); _responseByRequestId[requestId] = requestModel; await SendMessageToServerAsync(messageTag, parameters, requestId); try { if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, TransportSendTimeout, MsDelay, MsFirstDelay) && _responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is SignalResponseDataMessage responseMessage) { startTime = obj.RequestDateTime; SignalRRequestModelPool.Return(obj); if (responseMessage.Status == SignalResponseStatus.Error) { var errorText = $"Client SendMessageToServerAsync response error; await; tag: {messageTag}; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}"; Logger.Error(errorText); return await Task.FromException(new Exception(errorText)); } // Special case: when TResponse is SignalResponseDataMessage, return the message itself // instead of trying to deserialize ResponseData (which would cause InvalidCastException) if (typeof(TResponse) == typeof(SignalResponseDataMessage)) { var serializerType = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client returning raw SignalResponseDataMessage ({serializerType}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return (TResponse)(object)responseMessage; } var responseData = responseMessage.GetResponseData(); if (responseData == null && responseMessage.Status == SignalResponseStatus.Success) { Logger.Info($"Client received null response. Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return default; } var serializerType2 = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON"; Logger.Info($"Client deserialized response ({serializerType2}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); return responseData; } Logger.Error($"Client timeout after: {(DateTime.Now - startTime).TotalSeconds} sec! ConnectionState: {GetConnectionState()}; requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]"); } catch (Exception ex) { Logger.Error($"Client SendMessageToServerAsync; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); } if (_responseByRequestId.TryRemove(requestId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return default; } protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32; public virtual Task OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, byte[] data) { var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}"; if (data.Length == 0) Logger.Warning($"data.Length == 0! {logText}"); try { if (requestId.HasValue && _responseByRequestId.TryGetValue(requestId.Value, out var requestModel)) { var reqId = requestId.Value; requestModel.ResponseDateTime = DateTime.UtcNow; Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{data.Length / 1024}kb]{logText}"); // Diagnostic logging for binary deserialization debugging LogBinaryDiagnostics(messageTag, data, requestId); // No envelope deserialization — construct directly from params + data var responseMessage = new SignalResponseDataMessage { Status = signalParams.Status, DataSerializerType = signalParams.DataSerializerType, ResponseData = data }; switch (requestModel.ResponseByRequestId) { case null: requestModel.ResponseByRequestId = responseMessage; return Task.CompletedTask; case Action actionCallback: if (_responseByRequestId.TryRemove(reqId, out var actionModel)) SignalRRequestModelPool.Return(actionModel); actionCallback.Invoke(responseMessage); return Task.CompletedTask; case Func funcCallback: if (_responseByRequestId.TryRemove(reqId, out var funcModel)) SignalRRequestModelPool.Return(funcModel); return funcCallback.Invoke(responseMessage); default: Logger.Error($"Client OnReceiveMessage switch; unknown message type: {requestModel.ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(TagsName, messageTag)}"); break; } if (_responseByRequestId.TryRemove(reqId, out var removedModel)) SignalRRequestModelPool.Return(removedModel); return Task.CompletedTask; } Logger.Info(logText); MessageReceived(messageTag, signalParams, data).Forget(); } catch (Exception ex) { // Enhanced error logging with binary diagnostics if (data.Length > 0) { LogBinaryDiagnosticsOnError(messageTag, data, requestId, ex); } if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel)) SignalRRequestModelPool.Return(exModel); Logger.Error($"Client OnReceiveMessage; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex); throw; } return Task.CompletedTask; } /// /// Enable diagnostic logging for binary deserialization debugging. /// Set to true to log hex dumps of received binary data. /// public bool EnableBinaryDiagnostics { get; set; } = false; /// /// Logs binary diagnostics for debugging serialization issues. /// private void LogBinaryDiagnostics(int messageTag, byte[] messageBytes, int? requestId) { if (!EnableBinaryDiagnostics || messageBytes.Length == 0) return; try { var hexDump = Convert.ToHexString(messageBytes.AsSpan(0, Math.Min(500, messageBytes.Length))); Logger.Info($"=== BINARY DIAGNOSTICS === Tag: {messageTag}; RequestId: {requestId}; Length: {messageBytes.Length}"); Logger.Info($"HEX (first 500 bytes): {hexDump}"); // Parse header info if (messageBytes.Length >= 3) { var version = messageBytes[0]; var marker = messageBytes[1]; Logger.Info($"Version: {version}; Marker: 0x{marker:X2}"); if ((marker & 0x10) != 0 && messageBytes.Length > 2) { var propCount = messageBytes[2]; Logger.Info($"Header property count: {propCount}"); // Parse first 10 property names var pos = 3; for (int i = 0; i < Math.Min((int)propCount, 10) && pos < messageBytes.Length; i++) { var strLen = messageBytes[pos++]; if (pos + strLen <= messageBytes.Length) { var propName = System.Text.Encoding.UTF8.GetString(messageBytes, pos, strLen); pos += strLen; Logger.Info($" [{i}]: '{propName}'"); } } } } } catch (Exception ex) { Logger.Warning($"Failed to log binary diagnostics: {ex.Message}"); } } /// /// Logs binary diagnostics when an error occurs during deserialization. /// private void LogBinaryDiagnosticsOnError(int messageTag, byte[] messageBytes, int? requestId, Exception error) { try { Logger.Error($"=== BINARY DESERIALIZATION ERROR ==="); Logger.Error($"Tag: {messageTag}; RequestId: {requestId}; Length: {messageBytes.Length}"); Logger.Error($"Error: {error.Message}"); var hexDump = Convert.ToHexString(messageBytes.AsSpan(0, Math.Min(1000, messageBytes.Length))); Logger.Error($"HEX (first 1000 bytes): {hexDump}"); // Parse header info if (messageBytes.Length >= 3) { var version = messageBytes[0]; var marker = messageBytes[1]; Logger.Error($"Version: {version}; Marker: 0x{marker:X2}"); if ((marker & 0x10) != 0 && messageBytes.Length > 2) { var propCount = messageBytes[2]; Logger.Error($"Header property count: {propCount}"); // Parse ALL property names var pos = 3; for (int i = 0; i < propCount && pos < messageBytes.Length; i++) { var strLen = messageBytes[pos++]; if (pos + strLen <= messageBytes.Length) { var propName = System.Text.Encoding.UTF8.GetString(messageBytes, pos, strLen); pos += strLen; Logger.Error($" Header[{i}]: '{propName}'"); } } } } } catch (Exception ex) { Logger.Warning($"Failed to log binary diagnostics on error: {ex.Message}"); } } } }