AyCode.Core/AyCode.Services/SignalRs/AcSignalRClientBase.cs

481 lines
24 KiB
C#

using System.Collections.Concurrent;
using AyCode.Core;
using AyCode.Core.Extensions;
using AyCode.Core.Helpers;
using AyCode.Core.Loggers;
using AyCode.Core.Serializers.Jsons;
using AyCode.Interfaces.Entities;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
namespace AyCode.Services.SignalRs
{
public abstract class AcSignalRClientBase : IAcSignalRHubClient
{
private readonly ConcurrentDictionary<int, SignalRRequestModel> _responseByRequestId = new();
protected readonly HubConnection? HubConnection;
protected readonly AcLoggerBase Logger;
protected abstract Task MessageReceived(int messageTag, byte[] messageBytes);
public int MsDelay = 25;
public int MsFirstDelay = 50;
public int ConnectionTimeout = 10000;
public int TransportSendTimeout = 60000;
private const string TagsName = "SignalRTags";
protected AcSignalRClientBase(string fullHubName, AcLoggerBase logger)
{
Logger = logger;
Logger.Detail(fullHubName);
HubConnection = new HubConnectionBuilder()
.WithUrl(fullHubName, HttpTransportType.WebSockets,
options =>
{
options.TransportMaxBufferSize = 30_000_000;
options.ApplicationMaxBufferSize = 30_000_000;
options.CloseTimeout = TimeSpan.FromSeconds(10);
options.SkipNegotiation = true;
})
.WithAutomaticReconnect()
.WithStatefulReconnect()
.WithKeepAliveInterval(TimeSpan.FromSeconds(60))
.WithServerTimeout(TimeSpan.FromSeconds(180))
.Build();
HubConnection.Closed += HubConnection_Closed;
_ = HubConnection.On<int, byte[], int?>(nameof(IAcSignalRHubClient.OnReceiveMessage), OnReceiveMessage);
}
protected AcSignalRClientBase(AcLoggerBase logger)
{
Logger = logger;
HubConnection = null;
}
private Task HubConnection_Closed(Exception? arg)
{
if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed");
else Logger.Warning($"Client HubConnection_Closed; {nameof(_responseByRequestId)} count: {_responseByRequestId.Count}");
ClearPendingRequests();
return Task.CompletedTask;
}
#region Connection State Methods
protected virtual HubConnectionState GetConnectionState()
=> HubConnection?.State ?? HubConnectionState.Disconnected;
protected virtual bool IsConnected()
=> GetConnectionState() == HubConnectionState.Connected;
protected virtual Task StartConnectionInternal()
=> HubConnection?.StartAsync() ?? Task.CompletedTask;
protected virtual Task StopConnectionInternal()
=> HubConnection?.StopAsync() ?? Task.CompletedTask;
protected virtual ValueTask DisposeConnectionInternal()
=> HubConnection?.DisposeAsync() ?? ValueTask.CompletedTask;
protected virtual Task SendToHubAsync(int messageTag, byte[]? messageBytes, int? requestId)
=> HubConnection?.SendAsync(nameof(IAcSignalRHubClient.OnReceiveMessage), messageTag, messageBytes, requestId) ?? Task.CompletedTask;
#endregion
#region Protected Test Helpers
protected ConcurrentDictionary<int, SignalRRequestModel> GetPendingRequests() => _responseByRequestId;
protected void ClearPendingRequests() => _responseByRequestId.Clear();
protected void RegisterPendingRequest(int requestId, SignalRRequestModel model) => _responseByRequestId[requestId] = model;
#endregion
public async Task StartConnection()
{
if (GetConnectionState() == HubConnectionState.Disconnected)
await StartConnectionInternal();
if (!IsConnected())
await TaskHelper.WaitToAsync(IsConnected, ConnectionTimeout, 10, 25);
}
public async Task StopConnection()
{
await StopConnectionInternal();
await DisposeConnectionInternal();
}
public virtual Task SendMessageToServerAsync(int messageTag)
=> SendMessageToServerAsync(messageTag, null, GetNextRequestId());
public virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, int? requestId)
{
Logger.DebugConditional($"Client SendMessageToServerAsync sending; {nameof(requestId)}: {requestId}; ConnectionState: {GetConnectionState()}; {ConstHelper.NameByValue(TagsName, messageTag)}");
await StartConnection();
var msgBytes = message != null ? SignalRSerializationHelper.SerializeToBinary(message) : null;
if (!IsConnected())
{
Logger.Error($"Client SendMessageToServerAsync error! ConnectionState: {GetConnectionState()};");
return;
}
await SendToHubAsync(messageTag, msgBytes, requestId);
}
#region CRUD
public virtual Task<TResponseData?> PostAsync<TResponseData>(int messageTag, object parameter)
=> SendMessageToServerAsync<TResponseData>(messageTag, new SignalPostJsonDataMessage<IdMessage>(new IdMessage(parameter)), GetNextRequestId());
public virtual Task<TResponseData?> PostAsync<TResponseData>(int messageTag, object[] parameters)
=> SendMessageToServerAsync<TResponseData>(messageTag, new SignalPostJsonDataMessage<IdMessage>(new IdMessage(parameters)), GetNextRequestId());
public virtual Task<TResponseData?> GetByIdAsync<TResponseData>(int messageTag, object id)
=> PostAsync<TResponseData?>(messageTag, id);
public virtual Task<TResponseData?> GetByIdAsync<TResponseData>(int messageTag, object[] ids)
=> PostAsync<TResponseData?>(messageTag, ids);
/// <summary>
/// Gets data by ID with async callback response. Callback is second parameter.
/// </summary>
public virtual Task GetByIdAsync<TResponseData>(int messageTag, Func<SignalResponseDataMessage, Task> responseCallback, object id)
=> SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage<IdMessage>(new IdMessage(id)), responseCallback);
/// <summary>
/// Gets data by IDs with async callback response. Callback is second parameter.
/// </summary>
public virtual Task GetByIdAsync<TResponseData>(int messageTag, Func<SignalResponseDataMessage, Task> responseCallback, object[] ids)
=> SendMessageToServerAsync(messageTag, new SignalPostJsonDataMessage<IdMessage>(new IdMessage(ids)), responseCallback);
public virtual Task<TResponseData?> GetAllAsync<TResponseData>(int messageTag)
=> SendMessageToServerAsync<TResponseData>(messageTag);
public virtual Task<TResponseData?> GetAllAsync<TResponseData>(int messageTag, object[]? contextParams)
=> SendMessageToServerAsync<TResponseData>(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage<IdMessage>(new IdMessage(contextParams)), GetNextRequestId());
/// <summary>
/// Gets all data with async callback response. Callback is second parameter.
/// </summary>
public virtual Task GetAllAsync<TResponseData>(int messageTag, Func<SignalResponseDataMessage, Task> responseCallback)
=> SendMessageToServerAsync(messageTag, null, responseCallback);
/// <summary>
/// Gets all data with context params and async callback response.
/// </summary>
public virtual Task GetAllAsync<TResponseData>(int messageTag, Func<SignalResponseDataMessage, Task> responseCallback, object[]? contextParams)
=> SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage<IdMessage>(new IdMessage(contextParams)), responseCallback);
public virtual Task<TPostData?> PostDataAsync<TPostData>(int messageTag, TPostData postData) where TPostData : class
=> SendMessageToServerAsync<TPostData>(messageTag, CreatePostMessage(postData), GetNextRequestId());
public virtual Task<TResponseData?> PostDataAsync<TPostData, TResponseData>(int messageTag, TPostData postData)
=> SendMessageToServerAsync<TResponseData>(messageTag, CreatePostMessage(postData), GetNextRequestId());
/// <summary>
/// Posts data with async callback response.
/// </summary>
public virtual Task PostDataAsync<TPostData>(int messageTag, TPostData postData, Func<SignalResponseDataMessage, Task> responseCallback)
=> SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback);
/// <summary>
/// Posts data with typed async callback response.
/// </summary>
public virtual Task PostDataAsync<TPostData, TResponseData>(int messageTag, TPostData postData, Func<SignalResponseDataMessage, Task> responseCallback)
=> SendMessageToServerAsync(messageTag, CreatePostMessage(postData), responseCallback);
/// <summary>
/// Posts data and invokes callback with response. Fire-and-forget friendly for background saves.
/// </summary>
public virtual Task PostDataAsync<TPostData>(int messageTag, TPostData postData, Action<SignalResponseDataMessage> responseCallback)
{
var requestId = GetNextRequestId();
var requestModel = SignalRRequestModelPool.Get(responseCallback);
_responseByRequestId[requestId] = requestModel;
return SendMessageToServerAsync(messageTag, CreatePostMessage(postData), requestId);
}
private static ISignalRMessage CreatePostMessage<TPostData>(TPostData postData)
{
var type = typeof(TPostData);
if (type == typeof(string) || type.IsEnum || type.IsValueType)
return new SignalPostJsonDataMessage<IdMessage>(new IdMessage(postData!));
return new SignalPostJsonDataMessage<TPostData>(postData);
}
public Task GetAllIntoAsync<TResponseItem>(List<TResponseItem> intoList, int messageTag, object[]? contextParams = null, Action? callback = null) where TResponseItem : IEntityGuid
{
return GetAllAsync<List<TResponseItem>>(messageTag, contextParams).ContinueWith(task =>
{
var logText = $"GetAllIntoAsync<{typeof(TResponseItem).Name}>(); dataCount: {task.Result?.Count}; {ConstHelper.NameByValue(TagsName, messageTag)};";
intoList.Clear();
if (task.Result != null)
{
Logger.Debug(logText);
intoList.AddRange(task.Result);
}
else Logger.Error(logText);
callback?.Invoke();
}, TaskScheduler.Default);
}
#endregion
public virtual Task<TResponse?> SendMessageToServerAsync<TResponse>(int messageTag)
=> SendMessageToServerAsync<TResponse>(messageTag, null, GetNextRequestId());
public virtual Task<TResponse?> SendMessageToServerAsync<TResponse>(int messageTag, ISignalRMessage? message)
=> SendMessageToServerAsync<TResponse>(messageTag, message, GetNextRequestId());
/// <summary>
/// Sends message to server with async callback response.
/// </summary>
public virtual async Task SendMessageToServerAsync(int messageTag, ISignalRMessage? message, Func<SignalResponseDataMessage, Task> responseCallback)
{
var requestId = GetNextRequestId();
var requestModel = SignalRRequestModelPool.Get(responseCallback);
_responseByRequestId[requestId] = requestModel;
await SendMessageToServerAsync(messageTag, message, requestId);
}
protected virtual async Task<TResponse?> SendMessageToServerAsync<TResponse>(int messageTag, ISignalRMessage? message, int requestId)
{
Logger.DebugConditional($"Client SendMessageToServerAsync<TResult>; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}");
var startTime = DateTime.Now;
var requestModel = SignalRRequestModelPool.Get();
_responseByRequestId[requestId] = requestModel;
await SendMessageToServerAsync(messageTag, message, requestId);
try
{
if (await TaskHelper.WaitToAsync(() => _responseByRequestId[requestId].ResponseByRequestId != null, TransportSendTimeout, MsDelay, MsFirstDelay) &&
_responseByRequestId.TryRemove(requestId, out var obj) && obj.ResponseByRequestId is SignalResponseDataMessage responseMessage)
{
startTime = obj.RequestDateTime;
SignalRRequestModelPool.Return(obj);
if (responseMessage.Status == SignalResponseStatus.Error)
{
var errorText = $"Client SendMessageToServerAsync<TResponseData> response error; await; tag: {messageTag}; Status: {responseMessage.Status}; ConnectionState: {GetConnectionState()}; requestId: {requestId}";
Logger.Error(errorText);
return await Task.FromException<TResponse>(new Exception(errorText));
}
// Special case: when TResponse is SignalResponseDataMessage, return the message itself
// instead of trying to deserialize ResponseData (which would cause InvalidCastException)
if (typeof(TResponse) == typeof(SignalResponseDataMessage))
{
var serializerType = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON";
Logger.Info($"Client returning raw SignalResponseDataMessage ({serializerType}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]");
return (TResponse)(object)responseMessage;
}
var responseData = responseMessage.GetResponseData<TResponse>();
if (responseData == null && responseMessage.Status == SignalResponseStatus.Success)
{
Logger.Info($"Client received null response. Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]");
return default;
}
var serializerType2 = responseMessage.DataSerializerType == AcSerializerType.Binary ? "Binary" : "JSON";
Logger.Info($"Client deserialized response ({serializerType2}). Total: {(DateTime.UtcNow.Subtract(startTime)).TotalMilliseconds} ms! requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]");
return responseData;
}
Logger.Error($"Client timeout after: {(DateTime.Now - startTime).TotalSeconds} sec! ConnectionState: {GetConnectionState()}; requestId: {requestId}; tag: {messageTag} [{ConstHelper.NameByValue(TagsName, messageTag)}]");
}
catch (Exception ex)
{
Logger.Error($"Client SendMessageToServerAsync; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex);
}
if (_responseByRequestId.TryRemove(requestId, out var removedModel))
SignalRRequestModelPool.Return(removedModel);
return default;
}
protected virtual int GetNextRequestId() => AcDomain.NextUniqueInt32;
public virtual Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId)
{
var logText = $"Client OnReceiveMessage; {nameof(requestId)}: {requestId}; {ConstHelper.NameByValue(TagsName, messageTag)}";
if (messageBytes.Length == 0) Logger.Warning($"message.Length == 0! {logText}");
try
{
if (requestId.HasValue && _responseByRequestId.TryGetValue(requestId.Value, out var requestModel))
{
var reqId = requestId.Value;
requestModel.ResponseDateTime = DateTime.UtcNow;
Logger.Debug($"[{requestModel.ResponseDateTime.Subtract(requestModel.RequestDateTime).TotalMilliseconds:N0}ms][{messageBytes.Length / 1024}kb]{logText}");
// Diagnostic logging for binary deserialization debugging
LogBinaryDiagnostics(messageTag, messageBytes, requestId);
var responseMessage = SignalRSerializationHelper.DeserializeFromBinary<SignalResponseDataMessage>(messageBytes) ?? new SignalResponseDataMessage();
switch (requestModel.ResponseByRequestId)
{
case null:
requestModel.ResponseByRequestId = responseMessage;
return Task.CompletedTask;
case Action<SignalResponseDataMessage> actionCallback:
if (_responseByRequestId.TryRemove(reqId, out var actionModel))
SignalRRequestModelPool.Return(actionModel);
actionCallback.Invoke(responseMessage);
return Task.CompletedTask;
case Func<SignalResponseDataMessage, Task> funcCallback:
if (_responseByRequestId.TryRemove(reqId, out var funcModel))
SignalRRequestModelPool.Return(funcModel);
return funcCallback.Invoke(responseMessage);
default:
Logger.Error($"Client OnReceiveMessage switch; unknown message type: {requestModel.ResponseByRequestId?.GetType().Name}; {ConstHelper.NameByValue(TagsName, messageTag)}");
break;
}
if (_responseByRequestId.TryRemove(reqId, out var removedModel))
SignalRRequestModelPool.Return(removedModel);
return Task.CompletedTask;
}
Logger.Info(logText);
MessageReceived(messageTag, messageBytes).Forget();
}
catch (Exception ex)
{
// Enhanced error logging with binary diagnostics
if (messageBytes.Length > 0)
{
LogBinaryDiagnosticsOnError(messageTag, messageBytes, requestId, ex);
}
if (requestId.HasValue && _responseByRequestId.TryRemove(requestId.Value, out var exModel))
SignalRRequestModelPool.Return(exModel);
Logger.Error($"Client OnReceiveMessage; requestId: {requestId}; ConnectionState: {GetConnectionState()}; {ex.Message}; {ConstHelper.NameByValue(TagsName, messageTag)}", ex);
throw;
}
return Task.CompletedTask;
}
/// <summary>
/// Enable diagnostic logging for binary deserialization debugging.
/// Set to true to log hex dumps of received binary data.
/// </summary>
public bool EnableBinaryDiagnostics { get; set; } = false;
/// <summary>
/// Logs binary diagnostics for debugging serialization issues.
/// </summary>
private void LogBinaryDiagnostics(int messageTag, byte[] messageBytes, int? requestId)
{
if (!EnableBinaryDiagnostics || messageBytes.Length == 0) return;
try
{
var hexDump = Convert.ToHexString(messageBytes.AsSpan(0, Math.Min(500, messageBytes.Length)));
Logger.Info($"=== BINARY DIAGNOSTICS === Tag: {messageTag}; RequestId: {requestId}; Length: {messageBytes.Length}");
Logger.Info($"HEX (first 500 bytes): {hexDump}");
// Parse header info
if (messageBytes.Length >= 3)
{
var version = messageBytes[0];
var marker = messageBytes[1];
Logger.Info($"Version: {version}; Marker: 0x{marker:X2}");
if ((marker & 0x10) != 0 && messageBytes.Length > 2)
{
var propCount = messageBytes[2];
Logger.Info($"Header property count: {propCount}");
// Parse first 10 property names
var pos = 3;
for (int i = 0; i < Math.Min((int)propCount, 10) && pos < messageBytes.Length; i++)
{
var strLen = messageBytes[pos++];
if (pos + strLen <= messageBytes.Length)
{
var propName = System.Text.Encoding.UTF8.GetString(messageBytes, pos, strLen);
pos += strLen;
Logger.Info($" [{i}]: '{propName}'");
}
}
}
}
}
catch (Exception ex)
{
Logger.Warning($"Failed to log binary diagnostics: {ex.Message}");
}
}
/// <summary>
/// Logs binary diagnostics when an error occurs during deserialization.
/// </summary>
private void LogBinaryDiagnosticsOnError(int messageTag, byte[] messageBytes, int? requestId, Exception error)
{
try
{
Logger.Error($"=== BINARY DESERIALIZATION ERROR ===");
Logger.Error($"Tag: {messageTag}; RequestId: {requestId}; Length: {messageBytes.Length}");
Logger.Error($"Error: {error.Message}");
var hexDump = Convert.ToHexString(messageBytes.AsSpan(0, Math.Min(1000, messageBytes.Length)));
Logger.Error($"HEX (first 1000 bytes): {hexDump}");
// Parse header info
if (messageBytes.Length >= 3)
{
var version = messageBytes[0];
var marker = messageBytes[1];
Logger.Error($"Version: {version}; Marker: 0x{marker:X2}");
if ((marker & 0x10) != 0 && messageBytes.Length > 2)
{
var propCount = messageBytes[2];
Logger.Error($"Header property count: {propCount}");
// Parse ALL property names
var pos = 3;
for (int i = 0; i < propCount && pos < messageBytes.Length; i++)
{
var strLen = messageBytes[pos++];
if (pos + strLen <= messageBytes.Length)
{
var propName = System.Text.Encoding.UTF8.GetString(messageBytes, pos, strLen);
pos += strLen;
Logger.Error($" Header[{i}]: '{propName}'");
}
}
}
}
}
catch (Exception ex)
{
Logger.Warning($"Failed to log binary diagnostics on error: {ex.Message}");
}
}
}
}