diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index 7dc044c..1b959fb 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -5,7 +5,6 @@ using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading.Tasks; -using AyCode.Core.Helpers; namespace AyCode.Core.Serializers.Binaries; @@ -102,11 +101,12 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase CommitCurrentChunk(buffer, position); - // Fire-and-forget flush when previous is done + // Start next flush when previous is done; _lastFlush is retained for the next + // Grow / Flush to await (via SyncAwaitFlush). No .Forget() needed — calling it + // would consume the ValueTask and risk double-await when the next iteration waits. if (_lastFlush.IsCompleted) { _lastFlush = _pipeWriter.FlushAsync(); - _lastFlush.Forget(); } // Acquire new chunk with header reservation diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index cdf9d9d..ec21b9a 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -101,6 +101,14 @@ public class AcBinaryHubProtocol : IHubProtocol _protocolMode = protocolMode; _logger = logger; _chunkStates = new ConditionalWeakTable(); + + if (_logger != null) + { + _logger.LogInformation( + "AcBinaryHubProtocol initialized mode={ProtocolMode} chunkSize={ChunkSize} initCap={InitCap} useGen={UseGen} wireMode={WireMode} interning={Interning} compression={Compression}", + _protocolMode, _options.BufferWriterChunkSize, _options.InitialBufferCapacity, + _options.UseGeneratedCode, _options.WireMode, _options.UseStringInterning, _options.UseCompression); + } } /// @@ -176,6 +184,11 @@ public class AcBinaryHubProtocol : IHubProtocol public void WriteMessage(HubMessage message, IBufferWriter output) { + if (_logger != null) + { + _logger.LogInformation("Serialize start"); + } + // AsyncSegment: chunked protocol framing for messages with streamable arguments if (_protocolMode == BinaryProtocolMode.AsyncSegment && output is PipeWriter pipeWriter @@ -240,8 +253,13 @@ public class AcBinaryHubProtocol : IHubProtocol bw.Flush(); Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload); + if (_logger != null) + { + _logger.LogInformation("Serialize end totalSentSize={TotalSentSize}", LengthPrefixSize + totalPayload); + + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("WriteMessage {MessageType} payloadSize={PayloadSize}", message.GetType().Name, totalPayload); + } } private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) @@ -363,10 +381,13 @@ public class AcBinaryHubProtocol : IHubProtocol { var (streamedArg, streamedArgIndex) = GetStreamedArg(message); - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + if (_logger?.IsEnabled(LogLevel.Debug) == true) _logger.LogDebug("WriteMessageChunked {MessageType} streamedArgIndex={StreamedArgIndex} streamedArgType={StreamedArgType}", message.GetType().Name, streamedArgIndex, streamedArg?.GetType().Name ?? "null"); + int chunkStartPayload; + var dataBytes = 0; + // --- CHUNK_START (standard SignalR message framing: [INT32 len][payload]) --- { var lengthSpan = pipeWriter.GetSpan(LengthPrefixSize); @@ -419,18 +440,18 @@ public class AcBinaryHubProtocol : IHubProtocol break; } - var totalPayload = bw.Position + externalBytes; + chunkStartPayload = bw.Position + externalBytes; bw.Flush(); - Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); + Unsafe.WriteUnaligned(ref lengthSpan[0], chunkStartPayload); - _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", totalPayload); + _logger?.LogDebug("WriteMessageChunked CHUNK_START written payloadSize={PayloadSize}", chunkStartPayload); } SyncFlush(pipeWriter.FlushAsync()); // --- CHUNK_DATA ([201][UINT16 size][data] per chunk, all committed by output) --- if (streamedArg != null) { - var dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); + dataBytes = AcBinarySerializer.Serialize(streamedArg, pipeWriter, _options); _logger?.LogDebug("WriteMessageChunked CHUNK_DATA serialized dataBytes={DataBytes}", dataBytes); } @@ -441,6 +462,18 @@ public class AcBinaryHubProtocol : IHubProtocol SyncFlush(pipeWriter.FlushAsync()); _logger?.LogTrace("WriteMessageChunked CHUNK_END written"); + + // Total wire bytes = length prefix (4) + CHUNK_START payload + CHUNK_DATA frames + CHUNK_END (1) + // Each CHUNK_DATA frame adds 3 bytes ([201][UINT16 size]) per chunkSize-worth of data + var chunkSize = _options.BufferWriterChunkSize; + var chunkCount = dataBytes > 0 ? (dataBytes + chunkSize - 1) / chunkSize : 0; + var totalSentSize = LengthPrefixSize + chunkStartPayload + chunkCount * 3 + dataBytes + 1; + + if (_logger != null) + { + _logger.LogInformation("Serialize end (chunked) dataBytes={DataBytes} chunkCount={ChunkCount} totalSentSize={TotalSentSize}", + dataBytes, chunkCount, totalSentSize); + } } /// @@ -480,7 +513,7 @@ public class AcBinaryHubProtocol : IHubProtocol // Skip both to avoid duplicate writes to state.Buffer. if (TrySkipRepresentedChunkStart(ref input)) { - _logger?.LogInformation("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); + _logger?.LogDebug("TryParseMessage re-presented CHUNK_START detected and skipped, remainingInput={RemainingInput}", input.Length); // Also skip already-consumed chunk frame bytes (re-presented along with CHUNK_START) if (chunkState.ChunkFrameBytesConsumed > 0) @@ -492,13 +525,14 @@ public class AcBinaryHubProtocol : IHubProtocol return false; } input = input.Slice(chunkState.ChunkFrameBytesConsumed); - _logger?.LogInformation("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}", + _logger?.LogDebug("TryParseMessage skipped {Bytes} already-consumed chunk frame bytes, remainingInput={RemainingInput}", chunkState.ChunkFrameBytesConsumed, input.Length); } } - _logger?.LogInformation("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}", - binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0); + if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("TryParseMessage chunk mode active binderHash={BinderHash} inputLength={InputLength} firstByte={FirstByte}", + binder.GetHashCode(), input.Length, input.Length > 0 ? input.FirstSpan[0] : (byte)0); return TryParseChunkData(ref input, chunkState, binder, out message); } @@ -512,13 +546,24 @@ public class AcBinaryHubProtocol : IHubProtocol _logger?.LogTrace("TryParseMessage parsing payloadLength={PayloadLength} inputLength={InputLength}", payloadLength, input.Length); + if (_logger != null) + { + _logger.LogInformation("Deserialize start"); + } + message = ParseMessage(ref reader, payloadLength, binder); if (message != null) { input = input.Slice(LengthPrefixSize + payloadLength); - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); + + if (_logger != null) + { + _logger.LogInformation("Deserialize end"); + + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("TryParseMessage parsed {MessageType}", message.GetType().Name); + } return true; } @@ -531,7 +576,7 @@ public class AcBinaryHubProtocol : IHubProtocol { // Full chunked message processed in one call input = afterChunkStart; - _logger?.LogInformation("TryParseMessage CHUNK_START + chunk data processed in single call"); + _logger?.LogDebug("TryParseMessage CHUNK_START + chunk data processed in single call"); return true; } @@ -540,7 +585,7 @@ public class AcBinaryHubProtocol : IHubProtocol // the buffer state becomes inconsistent on re-submission. // On next call, the buffer may re-present CHUNK_START bytes; the chunk-mode // block above handles that via TrySkipRepresentedChunkStart. - _logger?.LogInformation("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)"); + _logger?.LogDebug("TryParseMessage CHUNK_START parsed, state added, waiting for chunk data (not advancing)"); return false; } @@ -807,8 +852,13 @@ public class AcBinaryHubProtocol : IHubProtocol { deserializedArg = state.DeserTask.GetAwaiter().GetResult(); - if (_logger != null && _logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); + if (_logger != null) + { + _logger.LogInformation("Deserialize end (chunked)"); + + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("TryParseChunkData deserialization complete resultType={ResultType}", deserializedArg?.GetType().Name ?? "null"); + } } } catch (Exception ex) @@ -893,7 +943,7 @@ public class AcBinaryHubProtocol : IHubProtocol }; _chunkStates.AddOrUpdate(binder, state); - _logger?.LogInformation("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}", + _logger?.LogDebug("ParseAsyncChunkStart _chunkStates.AddOrUpdate binderHash={BinderHash} streamedArgType={TargetType}", binder.GetHashCode(), streamedType.Name); return null; // chunk mode activated, next TryParseMessage goes to TryParseChunkData } diff --git a/AyCode.Services/SignalRs/AcSignalRClientBase.cs b/AyCode.Services/SignalRs/AcSignalRClientBase.cs index 0516b0b..727112c 100644 --- a/AyCode.Services/SignalRs/AcSignalRClientBase.cs +++ b/AyCode.Services/SignalRs/AcSignalRClientBase.cs @@ -54,7 +54,7 @@ namespace AyCode.Services.SignalRs .ConfigureLogging(logging => { // alap minimális MS log level - logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Warning); + logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Information); // regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt logging.AddAcLogger(_ => Logger);