diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 4620264..5dd7248 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -12,6 +12,7 @@ You are operating in a multi-repo, documentation-first architecture. You MUST ST - Your VERY FIRST AND ONLY allowed tool calls must be `file_search` or `get_file` targeting the `.md` documentation in the relevant `docs/` folders or `README.md`. - Do not answer the user's core question until the `[LOADED_DOCS]` list is populated with the base architecture files. - **CRITICAL EXCEPTION:** Do **NOT** re-read `.md` files that are already mapped in your context or `LOADED_DOCS` list (strictly maintain rule 20). + - **CROSS-REPO HARD-GATE:** When navigating to an external repo (via `own-dep-repos` paths), read that repo's `docs/` and `README.md` BEFORE searching its source code. The hard-gate applies to EVERY repo you enter, not just your own. 3. **STRICT NO-RE-READ POLICY (ANTI-LOOP):** You are PHYSICALLY FORBIDDEN from calling `get_file` or `file_search` on any `.md` file that is already listed in your `[LOADED_DOCS]` prefix. @@ -54,7 +55,7 @@ You are operating in a multi-repo, documentation-first architecture. You MUST ST 6. **AcJson** — Newtonsoft.Json wrapper with $id/$ref, IId-based reference resolution, and chain API. ## SignalR -7. **Single-method transport** — all SignalR communication uses `OnReceiveMessage(tag, bytes, requestId)`. Tags are `int` constants resolved via `DynamicMethodRegistry`. Never add conventional hub methods. +7. **Tag-based transport (no conventional hub methods)** — SignalR communication should generally use the generic methods provided by `AcWebSignalRHubBase` (server) and `AcSignalRClientBase` (client). Request types are conventionally identified by `int` tags. Try to avoid adding custom, business-specific, or conventional string-based Hub methods (e.g., `GetUsers()`). 8. **AcSignalRDataSource** — generic `IList` with change tracking, CRUD via `SignalRCrudTags`, binary merge, rollback. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. Transport docs: `AyCode.Services/docs/SIGNALR.md`. 9. **JSON-in-Binary tech debt** — client→server request parameters are currently JSON inside a Binary envelope (`SignalPostJsonDataMessage`). Do NOT attempt to fix as a side effect — requires coordinated changes across all consuming projects. diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs index 018236f..0c94b8d 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializer.cs @@ -290,10 +290,10 @@ public static partial class AcBinarySerializer public static byte[] Serialize(T value) => Serialize(value, AcBinarySerializerOptions.Default); /// - /// Serialize object to an IBufferWriter with default options. + /// Serialize object to an IBufferWriter with default options. Returns bytes written. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void Serialize(T value, IBufferWriter writer) => Serialize(value, writer, AcBinarySerializerOptions.Default); + public static int Serialize(T value, IBufferWriter writer) => Serialize(value, writer, AcBinarySerializerOptions.Default); /// /// Serialize object to binary with specified options. @@ -381,14 +381,14 @@ public static partial class AcBinarySerializer /// Uses BufferWriterBinaryOutput — writes directly to the caller's buffer. /// Note: Compression is applied if enabled in options. /// - public static void Serialize(T value, IBufferWriter writer, AcBinarySerializerOptions options) + public static int Serialize(T value, IBufferWriter writer, AcBinarySerializerOptions options) { if (value == null) { var span = writer.GetSpan(1); span[0] = BinaryTypeCode.Null; writer.Advance(1); - return; + return 1; } var runtimeType = value.GetType(); @@ -408,7 +408,7 @@ public static partial class AcBinarySerializer } var context = BinarySerializationContextPool.Get(options); - context.Output = new BufferWriterBinaryOutput(writer); + context.Output = new BufferWriterBinaryOutput(writer, options.BufferWriterChunkSize); context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd); try @@ -420,17 +420,15 @@ public static partial class AcBinarySerializer // Apply compression if enabled if (options.UseCompression != Lz4CompressionMode.None) { - // For compression with BufferWriter, we need to flush first then compress - // This path is less common — compression typically uses byte[] path context.Output.Flush(context._buffer, context._position); - // Compression with IBufferWriter requires intermediate buffer - // Fall back to ArrayBinaryOutput path for compression throw new NotSupportedException( "Compression is not supported with IBufferWriter output. " + "Use the byte[] overload or disable compression."); } + var bytesWritten = context.Output.GetTotalPosition(context._position); context.Output.Flush(context._buffer, context._position); + return bytesWritten; } finally { diff --git a/AyCode.Core/Serializers/Binaries/AcBinarySerializerOptions.cs b/AyCode.Core/Serializers/Binaries/AcBinarySerializerOptions.cs index 8bf9cb8..52acdb7 100644 --- a/AyCode.Core/Serializers/Binaries/AcBinarySerializerOptions.cs +++ b/AyCode.Core/Serializers/Binaries/AcBinarySerializerOptions.cs @@ -130,6 +130,33 @@ public sealed class AcBinarySerializerOptions : AcSerializerOptions /// public int InitialBufferCapacity { get; init; } = 4096; + /// + /// Chunk size (in bytes) used by when writing to an . + /// Controls how much data is accumulated before committing (Advance + GetMemory) to the underlying writer. + /// + /// How it works: The serializer writes into a chunk buffer. When the chunk fills up, + /// it commits the written bytes to the IBufferWriter and acquires a new chunk. Larger chunks mean + /// fewer Grow() calls (less overhead), but consume more memory per chunk. Smaller chunks reduce + /// memory footprint and latency-to-first-byte for streaming, but increase Grow() call frequency. + /// + /// Choosing a value: + /// + /// Memory-backed writers (ArrayPooledBufferWriter, file/DB blob): use 65536 (64 KB, the default). + /// Stays below the .NET LOH threshold (85 KB), minimizes Grow() overhead for large payloads. + /// An 8 MB payload triggers ~128 Grow() calls. + /// Network streaming (Kestrel PipeWriter, SignalR): use 4096 (4 KB). + /// Aligns with Kestrel's default memory pool slab size and TCP segment sizes (~1500 byte MTU × 3). + /// Reduces latency-to-first-byte by flushing data to the wire sooner. + /// + /// + /// Impact of wrong value: Using 64 KB on a network stream adds minor latency for the first chunk. + /// Using 4 KB for a memory-backed writer causes ~16× more Grow() calls than necessary (2048 vs 128 for 8 MB). + /// The default (64 KB) is the safe choice — suboptimal on network streams but never catastrophic. + /// + /// Default: 65536 (64 KB) + /// + public int BufferWriterChunkSize { get; init; } = 65536; + /// /// Optional property-level filter invoked before metadata registration and serialization. /// Return false to exclude the property from the payload. diff --git a/AyCode.Core/Serializers/Binaries/ArrayBinaryOutput.cs b/AyCode.Core/Serializers/Binaries/ArrayBinaryOutput.cs index 49382b0..db9684b 100644 --- a/AyCode.Core/Serializers/Binaries/ArrayBinaryOutput.cs +++ b/AyCode.Core/Serializers/Binaries/ArrayBinaryOutput.cs @@ -61,10 +61,12 @@ public struct ArrayBinaryOutput : IBinaryOutputBase, IDisposable #region Result Extraction — receive buffer/position from context + //TODO: miért nem static a AsSpan? /// Returns the written data as a ReadOnlySpan without allocation. [MethodImpl(MethodImplOptions.AggressiveInlining)] public ReadOnlySpan AsSpan(byte[] buffer, int position) => buffer.AsSpan(0, position); + //TODO: miért nem static a ToArray? Miért nem valami static common osztályban van? /// Copies the written data to a new exactly-sized array. public byte[] ToArray(byte[] buffer, int position) { @@ -81,6 +83,7 @@ public struct ArrayBinaryOutput : IBinaryOutputBase, IDisposable writer.Advance(position); } + //TODO: miért nem static a DetachResult? /// /// Detaches the internal buffer as a BinarySerializationResult and allocates a fresh buffer. /// The caller owns the returned result and must dispose it to return the buffer to the pool. diff --git a/AyCode.Core/Serializers/Binaries/BinaryTypeCode.cs b/AyCode.Core/Serializers/Binaries/BinaryTypeCode.cs index ba07470..23c761a 100644 --- a/AyCode.Core/Serializers/Binaries/BinaryTypeCode.cs +++ b/AyCode.Core/Serializers/Binaries/BinaryTypeCode.cs @@ -1,5 +1,7 @@ using System.Runtime.CompilerServices; +[assembly: InternalsVisibleTo("AyCode.Services")] + namespace AyCode.Core.Serializers.Binaries; /// diff --git a/AyCode.Core/Serializers/Binaries/BufferWriterBinaryOutput.cs b/AyCode.Core/Serializers/Binaries/BufferWriterBinaryOutput.cs index 4d6b4ad..cbc44e7 100644 --- a/AyCode.Core/Serializers/Binaries/BufferWriterBinaryOutput.cs +++ b/AyCode.Core/Serializers/Binaries/BufferWriterBinaryOutput.cs @@ -23,9 +23,8 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase { private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false); - private const int MinChunkRequest = 256; - private readonly IBufferWriter _writer; + private readonly int _chunkSize; private int _committedBytes; // total bytes Advanced to writer so far private int _currentChunkStart; // _position value at start of current chunk private bool _ownedBuffer; // true if current buffer is from ArrayPool (fallback path) @@ -35,12 +34,13 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase private int _position; private int _bufferEnd; - public BufferWriterBinaryOutput(IBufferWriter writer) + public BufferWriterBinaryOutput(IBufferWriter writer, int chunkSize = 65536) { _writer = writer; + _chunkSize = chunkSize; // Initialize standalone buffer for direct write usage _committedBytes = 0; - AcquireChunk(MinChunkRequest, out _buffer, out _position, out _bufferEnd); + AcquireChunk(_chunkSize, out _buffer, out _position, out _bufferEnd); _currentChunkStart = _position; } @@ -51,7 +51,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase public void Initialize(out byte[] buffer, out int position, out int bufferEnd) { _committedBytes = 0; - AcquireChunk(MinChunkRequest, out buffer, out position, out bufferEnd); + AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd); _currentChunkStart = position; } @@ -78,7 +78,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase } // Acquire new chunk - AcquireChunk(Math.Max(needed, MinChunkRequest), out buffer, out position, out bufferEnd); + AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd); _currentChunkStart = position; } @@ -125,7 +125,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) { // Use GetMemory so we can extract the backing array via TryGetArray - var actualRequest = Math.Max(requestSize, MinChunkRequest); + var actualRequest = Math.Max(requestSize, _chunkSize); var memory = _writer.GetMemory(actualRequest); if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && segment.Array != null) @@ -248,5 +248,26 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase Flush(_buffer, _position); } + /// + /// Commits pending bytes and invalidates the current chunk so that the underlying + /// IBufferWriter can be used directly (e.g. by AcBinarySerializer). + /// The next standalone write will re-acquire a fresh chunk via Grow. + /// + public void FlushAndReset() + { + var bytesInChunk = _position - _currentChunkStart; + if (bytesInChunk > 0) + { + if (_ownedBuffer) + FlushOwnedBuffer(_buffer, bytesInChunk); + else + _writer.Advance(bytesInChunk); + _committedBytes += bytesInChunk; + } + // Invalidate chunk — next write triggers Grow → AcquireChunk + _position = _bufferEnd; + _currentChunkStart = _bufferEnd; + } + #endregion } diff --git a/AyCode.Core/Serializers/Binaries/README.md b/AyCode.Core/Serializers/Binaries/README.md index 35eba99..f554278 100644 --- a/AyCode.Core/Serializers/Binaries/README.md +++ b/AyCode.Core/Serializers/Binaries/README.md @@ -53,7 +53,7 @@ For the complete wire format specification (encoding rules, header format, inter ### I/O Strategies - **`BinaryOutputBase.cs`** — Output interface. - **`ArrayBinaryOutput.cs`** — `ArrayPool`-backed output, fastest for `byte[]` result. -- **`BufferWriterBinaryOutput.cs`** — `IBufferWriter`-backed output for streaming. +- **`BufferWriterBinaryOutput.cs`** — `IBufferWriter`-backed output for streaming. Two modes: context mode (serialization pipeline) and standalone mode (direct write methods for framing, e.g. `AcBinaryHubProtocol`). - **`ArrayPooledBufferWriter.cs`** — Concrete `IBufferWriter` implementation. - **`IBinaryInputBase.cs`** — Input interface. - **`ArrayBinaryInput.cs`** — Single contiguous `byte[]` input. diff --git a/AyCode.Core/docs/BINARY_IMPLEMENTATION.md b/AyCode.Core/docs/BINARY_IMPLEMENTATION.md index 2ca75dd..cc35574 100644 --- a/AyCode.Core/docs/BINARY_IMPLEMENTATION.md +++ b/AyCode.Core/docs/BINARY_IMPLEMENTATION.md @@ -33,7 +33,7 @@ To support both `byte[]` returns and streaming models (via `IBufferWriter` The `TOutput` struct is **only** invoked on the cold path (when `_position >= _bufferEnd`). - `ArrayBinaryOutput`: Rents a newly doubled array from `ArrayPool`, copies existing data across, and returns the old array to the pool. When serialization finishes, a final buffer slice is returned (often avoiding `ToArray()` allocations altogether if wrapped in a `BinarySerializationResult`). -- `BufferWriterBinaryOutput`: Attempts to acquire a new chunk directly from the underlying `IBufferWriter` using `MemoryMarshal.TryGetArray`. If the `IBufferWriter` isn't backed by an array (e.g. native memory), it falls back to renting a temporary buffer from `ArrayPool`, writing to it, and later copying the chunk to the `IBufferWriter` on `Flush()` or `Grow()`. Otherwise, it writes directly to the `IBufferWriter`'s backing array without extra copying. +- `BufferWriterBinaryOutput`: Two usage modes. **Context mode** (Initialize/Grow/Flush): used by `BinarySerializationContext` for the serialization pipeline. **Standalone mode** (WriteByte/WriteVarUInt/WriteStringUtf8/etc.): direct write methods for use outside the serialization pipeline (e.g. `AcBinaryHubProtocol` frame headers). Both modes use a cached chunk pattern: acquires a large chunk once via `GetMemory` + `MemoryMarshal.TryGetArray`, extracts the backing array, and writes with direct indexing. If the `IBufferWriter` isn't backed by an array (e.g. native memory), it falls back to renting a temporary buffer from `ArrayPool`. `FlushAndReset()` commits pending bytes and invalidates the chunk, allowing the underlying `IBufferWriter` to be used directly by another writer (e.g. the serializer) before the BWO re-acquires a fresh chunk on the next write. Because `TOutput` is a generic struct constraint, the JIT completely devirtualizes the `Grow()` calls. diff --git a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs index e286be8..709939f 100644 --- a/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs +++ b/AyCode.Services.Server/SignalRs/AcWebSignalRHubBase.cs @@ -65,6 +65,102 @@ public abstract class AcWebSignalRHubBase(IConfiguration return ProcessOnReceiveMessage(messageTag, messageBytes, requestId, null); } + public virtual IAsyncEnumerable OnReceiveStreamMessage(int messageTag, byte[]? messageBytes) + { + return ProcessOnStreamMessage(messageTag, messageBytes, Context.ConnectionAborted); + } + + protected virtual async IAsyncEnumerable ProcessOnStreamMessage(int messageTag, byte[]? message, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + var tagName = ConstHelper.NameByValue(messageTag); + + Logger.Debug($"[{message?.Length ?? 0:N0}b] Server OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}"); + + try + { + if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId(); + + if (TryFindAndInvokeMethod(messageTag, message, tagName, out var responseData)) + { + if (responseData == null) + yield break; + + var resultType = responseData.GetType(); + var elementType = GetAsyncEnumerableElementType(resultType); + + if (elementType != null) + { + var typedEnumerable = GetTypedStream(elementType, responseData, messageTag, cancellationToken); + await foreach (var chunk in typedEnumerable.WithCancellation(cancellationToken)) + { + yield return chunk; + } + } + else + { + Logger.Warning($"Method '{tagName}' does not return IAsyncEnumerable. Returning normal message as single chunk."); + var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData); + yield return SignalRSerializationHelper.SerializeToBinary(responseMessage); + } + } + else + { + Logger.Warning($"Not found dynamic method for the tag! {tagName}"); + } + } + finally + { + Logger.Debug($"Server closed OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}"); + } + } + + private static readonly System.Collections.Concurrent.ConcurrentDictionary _streamMethods = new(); + + private IAsyncEnumerable GetTypedStream(Type elementType, object responseData, int messageTag, CancellationToken ct) + { + var methodInfo = _streamMethods.GetOrAdd(elementType, type => + typeof(AcWebSignalRHubBase) + .GetMethod(nameof(EnumerateGenericAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .MakeGenericMethod(type)); + + return (IAsyncEnumerable)methodInfo.Invoke(this, [responseData, messageTag, ct])!; + } + + private async IAsyncEnumerable EnumerateGenericAsync(object result, int messageTag, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + var enumerable = (IAsyncEnumerable)result; + await foreach (var item in enumerable.WithCancellation(cancellationToken)) + { + if (item is byte[] bytes) + { + yield return bytes; + } + else if (item is ISignalRMessage sigMsg) + { + yield return SignalRSerializationHelper.SerializeToBinary(sigMsg); + } + else + { + var msg = CreateResponseMessage(messageTag, SignalResponseStatus.Success, item); + yield return SignalRSerializationHelper.SerializeToBinary(msg); + } + } + } + + private static Type? GetAsyncEnumerableElementType(Type type) + { + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)) + return type.GetGenericArguments()[0]; + + foreach (var intf in type.GetInterfaces()) + { + if (intf.IsGenericType && intf.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)) + return intf.GetGenericArguments()[0]; + } + + return null; + } + protected virtual async Task ProcessOnReceiveMessage(int messageTag, byte[]? message, int? requestId, Func? notFoundCallback) { var tagName = ConstHelper.NameByValue(messageTag); diff --git a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs index 54b3305..fbb1781 100644 --- a/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs +++ b/AyCode.Services/SignalRs/AcBinaryHubProtocol.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Text; using AyCode.Core.Serializers.Binaries; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR; @@ -19,11 +20,12 @@ namespace AyCode.Services.SignalRs; /// [1 byte: message type] [message-specific fields serialized via AcBinary] /// /// Message types map 1:1 to SignalR HubMessageType values. -/// Arguments are serialized individually with a VarUInt length prefix each, +/// Arguments are serialized individually with an INT32 length prefix each, /// enabling deferred deserialization via IHubProtocol's binder pattern. /// -/// All writes go directly to the IBufferWriter provided by SignalR via BufferWriterBinaryOutput. -/// Length prefix is patched in-place after payload is written. +/// All writes go through BufferWriterBinaryOutput for zero virtual dispatch +/// on the hot path. Argument payloads are serialized directly to the pipe +/// via AcBinarySerializer (zero-copy). Length prefixes are patched in-place. /// public sealed class AcBinaryHubProtocol : IHubProtocol { @@ -70,135 +72,125 @@ public sealed class AcBinaryHubProtocol : IHubProtocol public ReadOnlyMemory GetMessageBytes(HubMessage message) { - var writer = new ArrayBufferWriter(256); + // +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory, + // which would invalidate the length prefix span obtained before Advance. + var writer = new ArrayBufferWriter(_options.BufferWriterChunkSize + LengthPrefixSize); WriteMessage(message, writer); return writer.WrittenMemory; } public void WriteMessage(HubMessage message, IBufferWriter output) { - // Reserve 4 bytes for the length prefix — we'll patch it after writing the payload. - // GetMemory returns a contiguous block; we keep a reference to write the length later. - var lengthMemory = output.GetMemory(LengthPrefixSize); + // Reserve outer length prefix directly on the pipe (before BWO takes over) + var lengthSpan = output.GetSpan(LengthPrefixSize); output.Advance(LengthPrefixSize); - // Wrap the IBufferWriter in BufferWriterBinaryOutput for optimized writes. - var w = new BufferWriterBinaryOutput(output); + var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize); + int externalBytes = 0; switch (message) { case InvocationMessage m: - WriteInvocation(w, m); + WriteInvocation(ref bw, output, m, ref externalBytes); break; case StreamInvocationMessage m: - WriteStreamInvocation(w, m); + WriteStreamInvocation(ref bw, output, m, ref externalBytes); break; case StreamItemMessage m: - WriteStreamItem(w, m); + WriteStreamItem(ref bw, output, m, ref externalBytes); break; case CompletionMessage m: - WriteCompletion(w, m); + WriteCompletion(ref bw, output, m, ref externalBytes); break; case CancelInvocationMessage m: - WriteCancelInvocation(w, m); + WriteCancelInvocation(ref bw, m); break; case PingMessage: - w.WriteByte(MsgPing); + bw.WriteByte(MsgPing); break; case CloseMessage m: - WriteClose(w, m); + WriteClose(ref bw, m); break; case AckMessage m: - WriteAck(w, m); + bw.WriteByte(MsgAck); + bw.WriteRaw(m.SequenceId); break; case SequenceMessage m: - WriteSequence(w, m); + bw.WriteByte(MsgSequence); + bw.WriteRaw(m.SequenceId); break; default: throw new HubException($"Unexpected message type: {message.GetType().Name}"); } - // Flush pending chunk bytes to the underlying IBufferWriter, then patch length prefix. - w.Flush(); - Unsafe.WriteUnaligned(ref lengthMemory.Span[0], w.Position); + var totalPayload = bw.Position + externalBytes; + bw.Flush(); + Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload); } - private void WriteInvocation(BufferWriterBinaryOutput w, InvocationMessage m) + private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, InvocationMessage m, ref int externalBytes) { - w.WriteByte(MsgInvocation); - WriteNullableString(w, m.InvocationId); - WriteString(w, m.Target); - WriteArguments(w, m.Arguments); - WriteStringArray(w, m.StreamIds); - WriteHeaders(w, m.Headers); + bw.WriteByte(MsgInvocation); + WriteNullableString(ref bw, m.InvocationId); + bw.WriteStringUtf8(m.Target); + WriteArguments(ref bw, output, m.Arguments, ref externalBytes); + WriteStringArray(ref bw, m.StreamIds); + WriteHeaders(ref bw, m.Headers); } - private void WriteStreamInvocation(BufferWriterBinaryOutput w, StreamInvocationMessage m) + private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamInvocationMessage m, ref int externalBytes) { - w.WriteByte(MsgStreamInvocation); - WriteString(w, m.InvocationId!); - WriteString(w, m.Target); - WriteArguments(w, m.Arguments); - WriteStringArray(w, m.StreamIds); - WriteHeaders(w, m.Headers); + bw.WriteByte(MsgStreamInvocation); + bw.WriteStringUtf8(m.InvocationId!); + bw.WriteStringUtf8(m.Target); + WriteArguments(ref bw, output, m.Arguments, ref externalBytes); + WriteStringArray(ref bw, m.StreamIds); + WriteHeaders(ref bw, m.Headers); } - private void WriteStreamItem(BufferWriterBinaryOutput w, StreamItemMessage m) + private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter output, StreamItemMessage m, ref int externalBytes) { - w.WriteByte(MsgStreamItem); - WriteString(w, m.InvocationId!); - WriteArgument(w, m.Item); - WriteHeaders(w, m.Headers); + bw.WriteByte(MsgStreamItem); + bw.WriteStringUtf8(m.InvocationId!); + WriteArgument(ref bw, output, m.Item, ref externalBytes); + WriteHeaders(ref bw, m.Headers); } - private void WriteCompletion(BufferWriterBinaryOutput w, CompletionMessage m) + private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter output, CompletionMessage m, ref int externalBytes) { - w.WriteByte(MsgCompletion); - WriteString(w, m.InvocationId!); - WriteNullableString(w, m.Error); + bw.WriteByte(MsgCompletion); + bw.WriteStringUtf8(m.InvocationId!); + WriteNullableString(ref bw, m.Error); - // Result presence flags: 0 = no result, 1 = has result var hasResult = m.HasResult; - w.WriteByte(hasResult ? (byte)1 : (byte)0); + bw.WriteByte(hasResult ? (byte)1 : (byte)0); if (hasResult) - WriteArgument(w, m.Result); + WriteArgument(ref bw, output, m.Result, ref externalBytes); - WriteHeaders(w, m.Headers); + WriteHeaders(ref bw, m.Headers); } - private static void WriteCancelInvocation(BufferWriterBinaryOutput w, CancelInvocationMessage m) + private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m) { - w.WriteByte(MsgCancelInvocation); - WriteString(w, m.InvocationId!); - WriteHeaders(w, m.Headers); + bw.WriteByte(MsgCancelInvocation); + bw.WriteStringUtf8(m.InvocationId!); + WriteHeaders(ref bw, m.Headers); } - private static void WriteClose(BufferWriterBinaryOutput w, CloseMessage m) + private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m) { - w.WriteByte(MsgClose); - WriteNullableString(w, m.Error); - w.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0); - } - - private static void WriteAck(BufferWriterBinaryOutput w, AckMessage m) - { - w.WriteByte(MsgAck); - w.WriteRaw(m.SequenceId); - } - - private static void WriteSequence(BufferWriterBinaryOutput w, SequenceMessage m) - { - w.WriteByte(MsgSequence); - w.WriteRaw(m.SequenceId); + bw.WriteByte(MsgClose); + WriteNullableString(ref bw, m.Error); + bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0); } #endregion @@ -212,7 +204,6 @@ public sealed class AcBinaryHubProtocol : IHubProtocol if (input.Length < LengthPrefixSize) return false; - // Read length prefix int payloadLength; if (input.FirstSpan.Length >= LengthPrefixSize) { @@ -231,7 +222,6 @@ public sealed class AcBinaryHubProtocol : IHubProtocol var payload = input.Slice(LengthPrefixSize, payloadLength); - // Linearize payload for span-based reading ReadOnlySpan span; byte[]? rentedBuffer = null; @@ -382,29 +372,39 @@ public sealed class AcBinaryHubProtocol : IHubProtocol #endregion - #region Argument Serialization (AcBinary payload per argument) + #region Argument Serialization - private void WriteArguments(BufferWriterBinaryOutput w, object?[] arguments) + private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter output, object?[] arguments, ref int externalBytes) { - w.WriteVarUInt((uint)arguments.Length); + bw.WriteVarUInt((uint)arguments.Length); for (var i = 0; i < arguments.Length; i++) - WriteArgument(w, arguments[i]); + WriteArgument(ref bw, output, arguments[i], ref externalBytes); } - private void WriteArgument(BufferWriterBinaryOutput w, object? value) + private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter output, object? value, ref int externalBytes) { - if (value == null) + if (value is byte[] byteArray) { - w.WriteVarUInt(1); - w.WriteByte(0); // BinaryTypeCode.Null + // byte[] fast-path: size known upfront, write entirely through BWO + var argPayload = 1 + VarUIntSize((uint)byteArray.Length) + byteArray.Length; + bw.WriteRaw(argPayload); + bw.WriteByte(BinaryTypeCode.ByteArray); + bw.WriteVarUInt((uint)byteArray.Length); + bw.WriteBytes(byteArray); return; } - // AcBinarySerializer needs the full payload size upfront (2-pass), - // so we serialize to a pooled byte[] first, then copy length-prefixed. - var serialized = AcBinarySerializer.Serialize(value, _options); - w.WriteVarUInt((uint)serialized.Length); - w.WriteBytes(serialized); + // Flush BWO to pipe, then serialize directly to the pipe via AcBinarySerializer + bw.FlushAndReset(); + + // Reserve arg length prefix directly on the pipe + var argLenSpan = output.GetSpan(LengthPrefixSize); + output.Advance(LengthPrefixSize); + + var argBytes = AcBinarySerializer.Serialize(value, output, _options); + + Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes); + externalBytes += LengthPrefixSize + argBytes; } private object?[] ReadArguments(ref SpanReader r, IReadOnlyList paramTypes) @@ -423,70 +423,79 @@ public sealed class AcBinaryHubProtocol : IHubProtocol private object? ReadSingleArgument(ref SpanReader r, Type targetType) { - var argLength = (int)r.ReadVarUInt(); + var argLength = r.ReadInt32(); if (argLength == 0) return null; var argSpan = r.ReadSpan(argLength); - if (argLength == 1 && argSpan[0] == 0) // BinaryTypeCode.Null + if (argLength == 1 && argSpan[0] == 0) return null; + // byte[] fast-path: bypass deserializer engine + if (targetType == typeof(byte[]) && argSpan.Length > 0 && argSpan[0] == BinaryTypeCode.ByteArray) + { + var byteReader = new SpanReader(argSpan.Slice(1)); + var len = (int)byteReader.ReadVarUInt(); + return byteReader.ReadSpan(len).ToArray(); + } + return AcBinaryDeserializer.Deserialize(argSpan, targetType, _options); } #endregion - #region Framing Helpers (string, nullable string, string array, headers) + #region Framing Helpers [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void WriteString(BufferWriterBinaryOutput w, string value) - { - w.WriteStringUtf8(value); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void WriteNullableString(BufferWriterBinaryOutput w, string? value) + private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value) { if (value == null) { - w.WriteByte(0); // null marker + bw.WriteByte(0); return; } - - w.WriteByte(1); // present marker - w.WriteStringUtf8(value); + bw.WriteByte(1); + bw.WriteStringUtf8(value); } - private static void WriteStringArray(BufferWriterBinaryOutput w, string[]? array) + private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array) { if (array == null || array.Length == 0) { - w.WriteVarUInt(0); + bw.WriteVarUInt(0); return; } - - w.WriteVarUInt((uint)array.Length); + bw.WriteVarUInt((uint)array.Length); for (var i = 0; i < array.Length; i++) - w.WriteStringUtf8(array[i]); + bw.WriteStringUtf8(array[i]); } - private static void WriteHeaders(BufferWriterBinaryOutput w, IDictionary? headers) + private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary? headers) { if (headers == null || headers.Count == 0) { - w.WriteVarUInt(0); + bw.WriteVarUInt(0); return; } - - w.WriteVarUInt((uint)headers.Count); + bw.WriteVarUInt((uint)headers.Count); foreach (var kv in headers) { - w.WriteStringUtf8(kv.Key); - w.WriteStringUtf8(kv.Value); + bw.WriteStringUtf8(kv.Key); + bw.WriteStringUtf8(kv.Value); } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int VarUIntSize(uint value) + { + if (value < 0x80) return 1; + if (value < 0x4000) return 2; + if (value < 0x200000) return 3; + if (value < 0x10000000) return 4; + return 5; + } + #endregion #region Helpers @@ -552,6 +561,14 @@ public sealed class AcBinaryHubProtocol : IHubProtocol [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte ReadByte() => _span[_pos++]; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public int ReadInt32() + { + var value = Unsafe.ReadUnaligned(ref Unsafe.AsRef(in _span[_pos])); + _pos += 4; + return value; + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public long ReadInt64() { @@ -589,7 +606,7 @@ public sealed class AcBinaryHubProtocol : IHubProtocol if (byteCount == 0) return string.Empty; var bytes = ReadSpan(byteCount); - return System.Text.Encoding.UTF8.GetString(bytes); + return Encoding.UTF8.GetString(bytes); } public string? ReadNullableString() diff --git a/AyCode.Services/SignalRs/AcSignalRClientBase.cs b/AyCode.Services/SignalRs/AcSignalRClientBase.cs index 6cd017e..a1fcd11 100644 --- a/AyCode.Services/SignalRs/AcSignalRClientBase.cs +++ b/AyCode.Services/SignalRs/AcSignalRClientBase.cs @@ -69,6 +69,11 @@ namespace AyCode.Services.SignalRs HubConnection = null; } + public virtual IAsyncEnumerable OnReceiveStreamMessage(int messageTag, byte[]? messageBytes) + { + throw new NotSupportedException("Client does not support serving streams to the server. Streams are established Server-to-Client only."); + } + private Task HubConnection_Closed(Exception? arg) { if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed"); @@ -187,6 +192,49 @@ namespace AyCode.Services.SignalRs public virtual Task GetAllAsync(int messageTag, Func responseCallback, object[]? contextParams) => SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)), responseCallback); + public virtual async IAsyncEnumerable StreamAllAsync(int messageTag, object[]? contextParams = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await StartConnection(); + + if (HubConnection == null || !IsConnected()) + { + Logger.Error($"Client StreamAllAsync error! ConnectionState: {GetConnectionState()};"); + yield break; + } + + var message = contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage(new IdMessage(contextParams)); + var msgBytes = message != null ? SignalRSerializationHelper.SerializeToBinary(message) : null; + + var stream = HubConnection.StreamAsync( + nameof(IAcSignalRHubBase.OnReceiveStreamMessage), + messageTag, + msgBytes, + cancellationToken); + + await foreach (var bytes in stream.WithCancellation(cancellationToken)) + { + if (bytes == null) continue; + + if (typeof(TResponseData) == typeof(byte[])) + { + yield return (TResponseData)(object)bytes; + continue; + } + + var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); + if (responseMessage != null) + { + if (responseMessage.Status == SignalResponseStatus.Error) + { + var errorText = $"Client StreamAllAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; + Logger.Error(errorText); + throw new Exception(errorText); + } + yield return responseMessage.GetResponseData(); + } + } + } + public virtual Task PostDataAsync(int messageTag, TPostData postData) where TPostData : class => SendMessageToServerAsync(messageTag, CreatePostMessage(postData), GetNextRequestId()); @@ -217,6 +265,49 @@ namespace AyCode.Services.SignalRs return SendMessageToServerAsync(messageTag, CreatePostMessage(postData), requestId); } + public virtual async IAsyncEnumerable StreamPostDataAsync(int messageTag, TPostData postData, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await StartConnection(); + + if (HubConnection == null || !IsConnected()) + { + Logger.Error($"Client StreamPostDataAsync error! ConnectionState: {GetConnectionState()};"); + yield break; + } + + var message = CreatePostMessage(postData); + var msgBytes = SignalRSerializationHelper.SerializeToBinary(message); + + var stream = HubConnection.StreamAsync( + nameof(IAcSignalRHubBase.OnReceiveStreamMessage), + messageTag, + msgBytes, + cancellationToken); + + await foreach (var bytes in stream.WithCancellation(cancellationToken)) + { + if (bytes == null) continue; + + if (typeof(TResponseData) == typeof(byte[])) + { + yield return (TResponseData)(object)bytes; + continue; + } + + var responseMessage = SignalRSerializationHelper.DeserializeFromBinary(bytes); + if (responseMessage != null) + { + if (responseMessage.Status == SignalResponseStatus.Error) + { + var errorText = $"Client StreamPostDataAsync error; tag: {messageTag}; Status: {responseMessage.Status}"; + Logger.Error(errorText); + throw new Exception(errorText); + } + yield return responseMessage.GetResponseData(); + } + } + } + private static ISignalRMessage CreatePostMessage(TPostData postData) { var type = typeof(TPostData); diff --git a/AyCode.Services/SignalRs/IAcSignalRHubBase.cs b/AyCode.Services/SignalRs/IAcSignalRHubBase.cs index 7ec6f9c..e5c4fa9 100644 --- a/AyCode.Services/SignalRs/IAcSignalRHubBase.cs +++ b/AyCode.Services/SignalRs/IAcSignalRHubBase.cs @@ -4,4 +4,6 @@ public interface IAcSignalRHubBase { //Task OnRequestMessage(int messageTag, int requestId); Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId); + + IAsyncEnumerable OnReceiveStreamMessage(int messageTag, byte[]? messageBytes); } \ No newline at end of file diff --git a/AyCode.Services/SignalRs/README.md b/AyCode.Services/SignalRs/README.md index d88ca9e..378a81e 100644 --- a/AyCode.Services/SignalRs/README.md +++ b/AyCode.Services/SignalRs/README.md @@ -7,7 +7,7 @@ Custom binary SignalR protocol, client infrastructure, message tagging, and seri ## Key Files ### Protocol -- **`AcBinaryHubProtocol.cs`** — Custom `IHubProtocol` replacing JSON+Base64 with `AcBinarySerializer`. Handles all 9 SignalR message types (Invocation, StreamItem, Completion, Ping, Close, etc.). Inner `SpanReader` ref struct for zero-alloc parsing. +- **`AcBinaryHubProtocol.cs`** — Custom `IHubProtocol` replacing JSON+Base64 with `AcBinarySerializer`. Handles all 9 SignalR message types (Invocation, StreamItem, Completion, Ping, Close, etc.). Uses `BufferWriterBinaryOutput` standalone mode for zero-copy writes to the SignalR pipe. `byte[]` fast-path bypasses the serializer entirely. Inner `SpanReader` ref struct for zero-alloc parsing. ### Client - **`AcSignalRClientBase.cs`** — Abstract SignalR client managing `HubConnection`, request/response tracking via pooled `SignalRRequestModel`. Methods: `SendMessageToServerAsync()`, CRUD helpers (Post, Get, GetAll, GetAllInto). Configurable timeouts. diff --git a/AyCode.Services/docs/SIGNALR.md b/AyCode.Services/docs/SIGNALR.md index 9b0e46e..f07e792 100644 --- a/AyCode.Services/docs/SIGNALR.md +++ b/AyCode.Services/docs/SIGNALR.md @@ -101,7 +101,13 @@ Custom `IHubProtocol` (name: `"acbinary"`), replaces default JSON. Frame format: Message types: Invocation(1), StreamItem(2), Completion(3), Ping(6), Close(7), Ack(8), Sequence(9). -Arguments serialized individually with VarUInt length prefix. Direct write to `IBufferWriter` via `BufferWriterBinaryOutput`. +Arguments serialized individually with INT32 length prefix (patched in-place after payload is written). + +**Zero-copy write pipeline:** +All writes go through a single `BufferWriterBinaryOutput` in standalone mode (cached chunk pattern, zero virtual dispatch). For argument payloads, the BWO flushes to the pipe via `FlushAndReset()`, then `AcBinarySerializer.Serialize()` writes directly to the `IBufferWriter` (pipe) — zero-copy, no intermediate `byte[]` allocation. + +**Raw `byte[]` Fast-Path:** +When an argument is a pure `byte[]`, the size is known upfront. The protocol writes `BinaryTypeCode.ByteArray` (68) marker, `VarUInt` length, and raw bytes entirely through the `BufferWriterBinaryOutput`, completely skipping the `AcBinarySerializer` context, the two-phase scan, and the internal array pools. ### Response Message diff --git a/CLAUDE.md b/CLAUDE.md index 577d61b..d8a5d03 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,17 @@ CRITICAL: Your FIRST action in every session MUST be reading `.github/copilot-instructions.md`. Execute ALL session-start instructions found there before responding to any user query. It is the single source of truth for this repo. + +## SEQUENTIAL EXECUTION OVERRIDE +The AI AGENT CORE PROTOCOL in copilot-instructions.md requires STRICT SEQUENTIAL execution. This OVERRIDES your default parallelization behavior. Do NOT parallelize doc reads with code searches. The sequence is: +1. Read copilot-instructions.md → process its rules FULLY +2. Read ALL docs/ .md files listed in the protocol → wait for completion +3. Output [LOADED_DOCS: ...] prefix +4. ONLY THEN respond to the user's query or search code + +## Tool mapping for AI AGENT CORE PROTOCOL +The copilot-instructions.md references Copilot tool names. Map them to Claude Code tools: +- `get_file` / `file_search` → `Read`, `Glob`, `Grep` +- `code_search` / `get_symbols_by_name` / `find_symbol` → `Grep`, `Glob` +- `replace_string_in_file` / `edit_file` → `Edit` +- `create_file` → `Write` + +Follow the protocol using YOUR tools. The rules (LOADED_DOCS prefix, hard-gate, no-re-read, context recovery, explicit consent) apply equally to Claude Code. diff --git a/docs/GLOSSARY.md b/docs/GLOSSARY.md index 1496431..c65c8a1 100644 --- a/docs/GLOSSARY.md +++ b/docs/GLOSSARY.md @@ -74,7 +74,7 @@ For full architecture see `AyCode.Services/docs/SIGNALR.md`. | **Message Tag** | Integer identifier mapping to a method via `[SignalR(tag)]` or `[SignalRSendToClient(tag)]` attributes. | | **DynamicMethodRegistry** | Resolves message tags to `MethodInfo` at runtime. Static `ConcurrentDictionary` cache with lazy scan on miss. | | **SignalRCrudTags** | Sealed class bundling 5 independent tag integers (getAllTag, getItemTag, addTag, updateTag, removeTag) for entity CRUD. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. | -| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. | +| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. Uses `BufferWriterBinaryOutput` for zero-copy writes to the SignalR pipe. | | **SignalResponseDataMessage** | Response message supporting Binary or JSON+GZip. Responses use pure Binary (no JSON overhead). | | **SignalPostJsonDataMessage** | ⚠️ TECH DEBT — request params serialized to JSON inside Binary envelope. Planned for pure Binary replacement. | | **AcSignalRDataSource** | Generic real-time `IList` with change tracking, CRUD via SignalRCrudTags, binary merge, rollback, sync state. |