SignalR: Add streaming & zero-copy binary protocol

- Introduce OnReceiveStreamMessage for server/client streaming via IAsyncEnumerable<byte[]>
- AcBinaryHubProtocol: switch argument framing to INT32, enable direct zero-copy serialization to SignalR pipe
- Optimize byte[] argument handling (fast-path, no extra alloc)
- BufferWriterBinaryOutput: support configurable chunk size, add FlushAndReset
- AcBinarySerializer: IBufferWriter overload returns bytes written
- Update docs for streaming, protocol, and performance guidance
- Minor refactoring, add InternalsVisibleTo, improve comments
This commit is contained in:
Loretta 2026-04-04 00:47:48 +02:00
parent 896ee257c4
commit 0cb2b6c2d8
16 changed files with 411 additions and 131 deletions

View File

@ -12,6 +12,7 @@ You are operating in a multi-repo, documentation-first architecture. You MUST ST
- Your VERY FIRST AND ONLY allowed tool calls must be `file_search` or `get_file` targeting the `.md` documentation in the relevant `docs/` folders or `README.md`.
- Do not answer the user's core question until the `[LOADED_DOCS]` list is populated with the base architecture files.
- **CRITICAL EXCEPTION:** Do **NOT** re-read `.md` files that are already mapped in your context or `LOADED_DOCS` list (strictly maintain rule 20).
- **CROSS-REPO HARD-GATE:** When navigating to an external repo (via `own-dep-repos` paths), read that repo's `docs/` and `README.md` BEFORE searching its source code. The hard-gate applies to EVERY repo you enter, not just your own.
3. **STRICT NO-RE-READ POLICY (ANTI-LOOP):**
You are PHYSICALLY FORBIDDEN from calling `get_file` or `file_search` on any `.md` file that is already listed in your `[LOADED_DOCS]` prefix.
@ -54,7 +55,7 @@ You are operating in a multi-repo, documentation-first architecture. You MUST ST
6. **AcJson** — Newtonsoft.Json wrapper with $id/$ref, IId-based reference resolution, and chain API.
## SignalR
7. **Single-method transport** — all SignalR communication uses `OnReceiveMessage(tag, bytes, requestId)`. Tags are `int` constants resolved via `DynamicMethodRegistry`. Never add conventional hub methods.
7. **Tag-based transport (no conventional hub methods)** — SignalR communication should generally use the generic methods provided by `AcWebSignalRHubBase` (server) and `AcSignalRClientBase` (client). Request types are conventionally identified by `int` tags. Try to avoid adding custom, business-specific, or conventional string-based Hub methods (e.g., `GetUsers()`).
8. **AcSignalRDataSource** — generic `IList<T>` with change tracking, CRUD via `SignalRCrudTags`, binary merge, rollback. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. Transport docs: `AyCode.Services/docs/SIGNALR.md`.
9. **JSON-in-Binary tech debt** — client→server request parameters are currently JSON inside a Binary envelope (`SignalPostJsonDataMessage`). Do NOT attempt to fix as a side effect — requires coordinated changes across all consuming projects.

View File

@ -290,10 +290,10 @@ public static partial class AcBinarySerializer
public static byte[] Serialize<T>(T value) => Serialize(value, AcBinarySerializerOptions.Default);
/// <summary>
/// Serialize object to an IBufferWriter with default options.
/// Serialize object to an IBufferWriter with default options. Returns bytes written.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Serialize<T>(T value, IBufferWriter<byte> writer) => Serialize(value, writer, AcBinarySerializerOptions.Default);
public static int Serialize<T>(T value, IBufferWriter<byte> writer) => Serialize(value, writer, AcBinarySerializerOptions.Default);
/// <summary>
/// Serialize object to binary with specified options.
@ -381,14 +381,14 @@ public static partial class AcBinarySerializer
/// Uses BufferWriterBinaryOutput — writes directly to the caller's buffer.
/// Note: Compression is applied if enabled in options.
/// </summary>
public static void Serialize<T>(T value, IBufferWriter<byte> writer, AcBinarySerializerOptions options)
public static int Serialize<T>(T value, IBufferWriter<byte> writer, AcBinarySerializerOptions options)
{
if (value == null)
{
var span = writer.GetSpan(1);
span[0] = BinaryTypeCode.Null;
writer.Advance(1);
return;
return 1;
}
var runtimeType = value.GetType();
@ -408,7 +408,7 @@ public static partial class AcBinarySerializer
}
var context = BinarySerializationContextPool<BufferWriterBinaryOutput>.Get(options);
context.Output = new BufferWriterBinaryOutput(writer);
context.Output = new BufferWriterBinaryOutput(writer, options.BufferWriterChunkSize);
context.Output.Initialize(out context._buffer, out context._position, out context._bufferEnd);
try
@ -420,17 +420,15 @@ public static partial class AcBinarySerializer
// Apply compression if enabled
if (options.UseCompression != Lz4CompressionMode.None)
{
// For compression with BufferWriter, we need to flush first then compress
// This path is less common — compression typically uses byte[] path
context.Output.Flush(context._buffer, context._position);
// Compression with IBufferWriter requires intermediate buffer
// Fall back to ArrayBinaryOutput path for compression
throw new NotSupportedException(
"Compression is not supported with IBufferWriter output. " +
"Use the byte[] overload or disable compression.");
}
var bytesWritten = context.Output.GetTotalPosition(context._position);
context.Output.Flush(context._buffer, context._position);
return bytesWritten;
}
finally
{

View File

@ -130,6 +130,33 @@ public sealed class AcBinarySerializerOptions : AcSerializerOptions
/// </summary>
public int InitialBufferCapacity { get; init; } = 4096;
/// <summary>
/// Chunk size (in bytes) used by <see cref="BufferWriterBinaryOutput"/> when writing to an <see cref="System.Buffers.IBufferWriter{T}"/>.
/// Controls how much data is accumulated before committing (Advance + GetMemory) to the underlying writer.
///
/// <para><b>How it works:</b> The serializer writes into a chunk buffer. When the chunk fills up,
/// it commits the written bytes to the IBufferWriter and acquires a new chunk. Larger chunks mean
/// fewer Grow() calls (less overhead), but consume more memory per chunk. Smaller chunks reduce
/// memory footprint and latency-to-first-byte for streaming, but increase Grow() call frequency.</para>
///
/// <para><b>Choosing a value:</b></para>
/// <list type="bullet">
/// <item><b>Memory-backed writers</b> (ArrayPooledBufferWriter, file/DB blob): use 65536 (64 KB, the default).
/// Stays below the .NET LOH threshold (85 KB), minimizes Grow() overhead for large payloads.
/// An 8 MB payload triggers ~128 Grow() calls.</item>
/// <item><b>Network streaming</b> (Kestrel PipeWriter, SignalR): use 4096 (4 KB).
/// Aligns with Kestrel's default memory pool slab size and TCP segment sizes (~1500 byte MTU × 3).
/// Reduces latency-to-first-byte by flushing data to the wire sooner.</item>
/// </list>
///
/// <para><b>Impact of wrong value:</b> Using 64 KB on a network stream adds minor latency for the first chunk.
/// Using 4 KB for a memory-backed writer causes ~16× more Grow() calls than necessary (2048 vs 128 for 8 MB).
/// The default (64 KB) is the safe choice — suboptimal on network streams but never catastrophic.</para>
///
/// Default: 65536 (64 KB)
/// </summary>
public int BufferWriterChunkSize { get; init; } = 65536;
/// <summary>
/// Optional property-level filter invoked before metadata registration and serialization.
/// Return false to exclude the property from the payload.

View File

@ -61,10 +61,12 @@ public struct ArrayBinaryOutput : IBinaryOutputBase, IDisposable
#region Result Extraction receive buffer/position from context
//TODO: miért nem static a AsSpan?
/// <summary>Returns the written data as a ReadOnlySpan without allocation.</summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ReadOnlySpan<byte> AsSpan(byte[] buffer, int position) => buffer.AsSpan(0, position);
//TODO: miért nem static a ToArray? Miért nem valami static common osztályban van?
/// <summary>Copies the written data to a new exactly-sized array.</summary>
public byte[] ToArray(byte[] buffer, int position)
{
@ -81,6 +83,7 @@ public struct ArrayBinaryOutput : IBinaryOutputBase, IDisposable
writer.Advance(position);
}
//TODO: miért nem static a DetachResult?
/// <summary>
/// Detaches the internal buffer as a BinarySerializationResult and allocates a fresh buffer.
/// The caller owns the returned result and must dispose it to return the buffer to the pool.

View File

@ -1,5 +1,7 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("AyCode.Services")]
namespace AyCode.Core.Serializers.Binaries;
/// <summary>

View File

@ -23,9 +23,8 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
{
private static readonly Encoding Utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
private const int MinChunkRequest = 256;
private readonly IBufferWriter<byte> _writer;
private readonly int _chunkSize;
private int _committedBytes; // total bytes Advanced to writer so far
private int _currentChunkStart; // _position value at start of current chunk
private bool _ownedBuffer; // true if current buffer is from ArrayPool (fallback path)
@ -35,12 +34,13 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
private int _position;
private int _bufferEnd;
public BufferWriterBinaryOutput(IBufferWriter<byte> writer)
public BufferWriterBinaryOutput(IBufferWriter<byte> writer, int chunkSize = 65536)
{
_writer = writer;
_chunkSize = chunkSize;
// Initialize standalone buffer for direct write usage
_committedBytes = 0;
AcquireChunk(MinChunkRequest, out _buffer, out _position, out _bufferEnd);
AcquireChunk(_chunkSize, out _buffer, out _position, out _bufferEnd);
_currentChunkStart = _position;
}
@ -51,7 +51,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
public void Initialize(out byte[] buffer, out int position, out int bufferEnd)
{
_committedBytes = 0;
AcquireChunk(MinChunkRequest, out buffer, out position, out bufferEnd);
AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd);
_currentChunkStart = position;
}
@ -78,7 +78,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
}
// Acquire new chunk
AcquireChunk(Math.Max(needed, MinChunkRequest), out buffer, out position, out bufferEnd);
AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd);
_currentChunkStart = position;
}
@ -125,7 +125,7 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
{
// Use GetMemory so we can extract the backing array via TryGetArray
var actualRequest = Math.Max(requestSize, MinChunkRequest);
var actualRequest = Math.Max(requestSize, _chunkSize);
var memory = _writer.GetMemory(actualRequest);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null)
@ -248,5 +248,26 @@ public struct BufferWriterBinaryOutput : IBinaryOutputBase
Flush(_buffer, _position);
}
/// <summary>
/// Commits pending bytes and invalidates the current chunk so that the underlying
/// IBufferWriter can be used directly (e.g. by AcBinarySerializer).
/// The next standalone write will re-acquire a fresh chunk via Grow.
/// </summary>
public void FlushAndReset()
{
var bytesInChunk = _position - _currentChunkStart;
if (bytesInChunk > 0)
{
if (_ownedBuffer)
FlushOwnedBuffer(_buffer, bytesInChunk);
else
_writer.Advance(bytesInChunk);
_committedBytes += bytesInChunk;
}
// Invalidate chunk — next write triggers Grow → AcquireChunk
_position = _bufferEnd;
_currentChunkStart = _bufferEnd;
}
#endregion
}

View File

@ -53,7 +53,7 @@ For the complete wire format specification (encoding rules, header format, inter
### I/O Strategies
- **`BinaryOutputBase.cs`** — Output interface.
- **`ArrayBinaryOutput.cs`** — `ArrayPool`-backed output, fastest for `byte[]` result.
- **`BufferWriterBinaryOutput.cs`** — `IBufferWriter<byte>`-backed output for streaming.
- **`BufferWriterBinaryOutput.cs`** — `IBufferWriter<byte>`-backed output for streaming. Two modes: context mode (serialization pipeline) and standalone mode (direct write methods for framing, e.g. `AcBinaryHubProtocol`).
- **`ArrayPooledBufferWriter.cs`** — Concrete `IBufferWriter` implementation.
- **`IBinaryInputBase.cs`** — Input interface.
- **`ArrayBinaryInput.cs`** — Single contiguous `byte[]` input.

File diff suppressed because one or more lines are too long

View File

@ -65,6 +65,102 @@ public abstract class AcWebSignalRHubBase<TSignalRTags, TLogger>(IConfiguration
return ProcessOnReceiveMessage(messageTag, messageBytes, requestId, null);
}
public virtual IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes)
{
return ProcessOnStreamMessage(messageTag, messageBytes, Context.ConnectionAborted);
}
protected virtual async IAsyncEnumerable<byte[]> ProcessOnStreamMessage(int messageTag, byte[]? message, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);
Logger.Debug($"[{message?.Length ?? 0:N0}b] Server OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}");
try
{
if (AcDomain.IsDeveloperVersion) LogContextUserNameAndId();
if (TryFindAndInvokeMethod(messageTag, message, tagName, out var responseData))
{
if (responseData == null)
yield break;
var resultType = responseData.GetType();
var elementType = GetAsyncEnumerableElementType(resultType);
if (elementType != null)
{
var typedEnumerable = GetTypedStream(elementType, responseData, messageTag, cancellationToken);
await foreach (var chunk in typedEnumerable.WithCancellation(cancellationToken))
{
yield return chunk;
}
}
else
{
Logger.Warning($"Method '{tagName}' does not return IAsyncEnumerable. Returning normal message as single chunk.");
var responseMessage = CreateResponseMessage(messageTag, SignalResponseStatus.Success, responseData);
yield return SignalRSerializationHelper.SerializeToBinary(responseMessage);
}
}
else
{
Logger.Warning($"Not found dynamic method for the tag! {tagName}");
}
}
finally
{
Logger.Debug($"Server closed OnReceiveStreamMessage; ConnectionId: {GetConnectionId()}; {tagName}");
}
}
private static readonly System.Collections.Concurrent.ConcurrentDictionary<Type, System.Reflection.MethodInfo> _streamMethods = new();
private IAsyncEnumerable<byte[]> GetTypedStream(Type elementType, object responseData, int messageTag, CancellationToken ct)
{
var methodInfo = _streamMethods.GetOrAdd(elementType, type =>
typeof(AcWebSignalRHubBase<TSignalRTags, TLogger>)
.GetMethod(nameof(EnumerateGenericAsync), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
.MakeGenericMethod(type));
return (IAsyncEnumerable<byte[]>)methodInfo.Invoke(this, [responseData, messageTag, ct])!;
}
private async IAsyncEnumerable<byte[]> EnumerateGenericAsync<T>(object result, int messageTag, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
var enumerable = (IAsyncEnumerable<T>)result;
await foreach (var item in enumerable.WithCancellation(cancellationToken))
{
if (item is byte[] bytes)
{
yield return bytes;
}
else if (item is ISignalRMessage sigMsg)
{
yield return SignalRSerializationHelper.SerializeToBinary(sigMsg);
}
else
{
var msg = CreateResponseMessage(messageTag, SignalResponseStatus.Success, item);
yield return SignalRSerializationHelper.SerializeToBinary(msg);
}
}
}
private static Type? GetAsyncEnumerableElementType(Type type)
{
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
return type.GetGenericArguments()[0];
foreach (var intf in type.GetInterfaces())
{
if (intf.IsGenericType && intf.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
return intf.GetGenericArguments()[0];
}
return null;
}
protected virtual async Task ProcessOnReceiveMessage(int messageTag, byte[]? message, int? requestId, Func<string, Task>? notFoundCallback)
{
var tagName = ConstHelper.NameByValue<TSignalRTags>(messageTag);

View File

@ -1,6 +1,7 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text;
using AyCode.Core.Serializers.Binaries;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR;
@ -19,11 +20,12 @@ namespace AyCode.Services.SignalRs;
/// [1 byte: message type] [message-specific fields serialized via AcBinary]
///
/// Message types map 1:1 to SignalR HubMessageType values.
/// Arguments are serialized individually with a VarUInt length prefix each,
/// Arguments are serialized individually with an INT32 length prefix each,
/// enabling deferred deserialization via IHubProtocol's binder pattern.
///
/// All writes go directly to the IBufferWriter provided by SignalR via BufferWriterBinaryOutput.
/// Length prefix is patched in-place after payload is written.
/// All writes go through BufferWriterBinaryOutput for zero virtual dispatch
/// on the hot path. Argument payloads are serialized directly to the pipe
/// via AcBinarySerializer (zero-copy). Length prefixes are patched in-place.
/// </summary>
public sealed class AcBinaryHubProtocol : IHubProtocol
{
@ -70,135 +72,125 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
{
var writer = new ArrayBufferWriter<byte>(256);
// +LengthPrefixSize: prevents ArrayBufferWriter resize on first GetMemory,
// which would invalidate the length prefix span obtained before Advance.
var writer = new ArrayBufferWriter<byte>(_options.BufferWriterChunkSize + LengthPrefixSize);
WriteMessage(message, writer);
return writer.WrittenMemory;
}
public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{
// Reserve 4 bytes for the length prefix — we'll patch it after writing the payload.
// GetMemory returns a contiguous block; we keep a reference to write the length later.
var lengthMemory = output.GetMemory(LengthPrefixSize);
// Reserve outer length prefix directly on the pipe (before BWO takes over)
var lengthSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
// Wrap the IBufferWriter in BufferWriterBinaryOutput for optimized writes.
var w = new BufferWriterBinaryOutput(output);
var bw = new BufferWriterBinaryOutput(output, _options.BufferWriterChunkSize);
int externalBytes = 0;
switch (message)
{
case InvocationMessage m:
WriteInvocation(w, m);
WriteInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamInvocationMessage m:
WriteStreamInvocation(w, m);
WriteStreamInvocation(ref bw, output, m, ref externalBytes);
break;
case StreamItemMessage m:
WriteStreamItem(w, m);
WriteStreamItem(ref bw, output, m, ref externalBytes);
break;
case CompletionMessage m:
WriteCompletion(w, m);
WriteCompletion(ref bw, output, m, ref externalBytes);
break;
case CancelInvocationMessage m:
WriteCancelInvocation(w, m);
WriteCancelInvocation(ref bw, m);
break;
case PingMessage:
w.WriteByte(MsgPing);
bw.WriteByte(MsgPing);
break;
case CloseMessage m:
WriteClose(w, m);
WriteClose(ref bw, m);
break;
case AckMessage m:
WriteAck(w, m);
bw.WriteByte(MsgAck);
bw.WriteRaw(m.SequenceId);
break;
case SequenceMessage m:
WriteSequence(w, m);
bw.WriteByte(MsgSequence);
bw.WriteRaw(m.SequenceId);
break;
default:
throw new HubException($"Unexpected message type: {message.GetType().Name}");
}
// Flush pending chunk bytes to the underlying IBufferWriter, then patch length prefix.
w.Flush();
Unsafe.WriteUnaligned(ref lengthMemory.Span[0], w.Position);
var totalPayload = bw.Position + externalBytes;
bw.Flush();
Unsafe.WriteUnaligned(ref lengthSpan[0], totalPayload);
}
private void WriteInvocation(BufferWriterBinaryOutput w, InvocationMessage m)
private void WriteInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, InvocationMessage m, ref int externalBytes)
{
w.WriteByte(MsgInvocation);
WriteNullableString(w, m.InvocationId);
WriteString(w, m.Target);
WriteArguments(w, m.Arguments);
WriteStringArray(w, m.StreamIds);
WriteHeaders(w, m.Headers);
bw.WriteByte(MsgInvocation);
WriteNullableString(ref bw, m.InvocationId);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamInvocation(BufferWriterBinaryOutput w, StreamInvocationMessage m)
private void WriteStreamInvocation(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamInvocationMessage m, ref int externalBytes)
{
w.WriteByte(MsgStreamInvocation);
WriteString(w, m.InvocationId!);
WriteString(w, m.Target);
WriteArguments(w, m.Arguments);
WriteStringArray(w, m.StreamIds);
WriteHeaders(w, m.Headers);
bw.WriteByte(MsgStreamInvocation);
bw.WriteStringUtf8(m.InvocationId!);
bw.WriteStringUtf8(m.Target);
WriteArguments(ref bw, output, m.Arguments, ref externalBytes);
WriteStringArray(ref bw, m.StreamIds);
WriteHeaders(ref bw, m.Headers);
}
private void WriteStreamItem(BufferWriterBinaryOutput w, StreamItemMessage m)
private void WriteStreamItem(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, StreamItemMessage m, ref int externalBytes)
{
w.WriteByte(MsgStreamItem);
WriteString(w, m.InvocationId!);
WriteArgument(w, m.Item);
WriteHeaders(w, m.Headers);
bw.WriteByte(MsgStreamItem);
bw.WriteStringUtf8(m.InvocationId!);
WriteArgument(ref bw, output, m.Item, ref externalBytes);
WriteHeaders(ref bw, m.Headers);
}
private void WriteCompletion(BufferWriterBinaryOutput w, CompletionMessage m)
private void WriteCompletion(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, CompletionMessage m, ref int externalBytes)
{
w.WriteByte(MsgCompletion);
WriteString(w, m.InvocationId!);
WriteNullableString(w, m.Error);
bw.WriteByte(MsgCompletion);
bw.WriteStringUtf8(m.InvocationId!);
WriteNullableString(ref bw, m.Error);
// Result presence flags: 0 = no result, 1 = has result
var hasResult = m.HasResult;
w.WriteByte(hasResult ? (byte)1 : (byte)0);
bw.WriteByte(hasResult ? (byte)1 : (byte)0);
if (hasResult)
WriteArgument(w, m.Result);
WriteArgument(ref bw, output, m.Result, ref externalBytes);
WriteHeaders(w, m.Headers);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteCancelInvocation(BufferWriterBinaryOutput w, CancelInvocationMessage m)
private static void WriteCancelInvocation(ref BufferWriterBinaryOutput bw, CancelInvocationMessage m)
{
w.WriteByte(MsgCancelInvocation);
WriteString(w, m.InvocationId!);
WriteHeaders(w, m.Headers);
bw.WriteByte(MsgCancelInvocation);
bw.WriteStringUtf8(m.InvocationId!);
WriteHeaders(ref bw, m.Headers);
}
private static void WriteClose(BufferWriterBinaryOutput w, CloseMessage m)
private static void WriteClose(ref BufferWriterBinaryOutput bw, CloseMessage m)
{
w.WriteByte(MsgClose);
WriteNullableString(w, m.Error);
w.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0);
}
private static void WriteAck(BufferWriterBinaryOutput w, AckMessage m)
{
w.WriteByte(MsgAck);
w.WriteRaw(m.SequenceId);
}
private static void WriteSequence(BufferWriterBinaryOutput w, SequenceMessage m)
{
w.WriteByte(MsgSequence);
w.WriteRaw(m.SequenceId);
bw.WriteByte(MsgClose);
WriteNullableString(ref bw, m.Error);
bw.WriteByte(m.AllowReconnect ? (byte)1 : (byte)0);
}
#endregion
@ -212,7 +204,6 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
if (input.Length < LengthPrefixSize)
return false;
// Read length prefix
int payloadLength;
if (input.FirstSpan.Length >= LengthPrefixSize)
{
@ -231,7 +222,6 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
var payload = input.Slice(LengthPrefixSize, payloadLength);
// Linearize payload for span-based reading
ReadOnlySpan<byte> span;
byte[]? rentedBuffer = null;
@ -382,29 +372,39 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
#endregion
#region Argument Serialization (AcBinary payload per argument)
#region Argument Serialization
private void WriteArguments(BufferWriterBinaryOutput w, object?[] arguments)
private void WriteArguments(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object?[] arguments, ref int externalBytes)
{
w.WriteVarUInt((uint)arguments.Length);
bw.WriteVarUInt((uint)arguments.Length);
for (var i = 0; i < arguments.Length; i++)
WriteArgument(w, arguments[i]);
WriteArgument(ref bw, output, arguments[i], ref externalBytes);
}
private void WriteArgument(BufferWriterBinaryOutput w, object? value)
private void WriteArgument(ref BufferWriterBinaryOutput bw, IBufferWriter<byte> output, object? value, ref int externalBytes)
{
if (value == null)
if (value is byte[] byteArray)
{
w.WriteVarUInt(1);
w.WriteByte(0); // BinaryTypeCode.Null
// byte[] fast-path: size known upfront, write entirely through BWO
var argPayload = 1 + VarUIntSize((uint)byteArray.Length) + byteArray.Length;
bw.WriteRaw(argPayload);
bw.WriteByte(BinaryTypeCode.ByteArray);
bw.WriteVarUInt((uint)byteArray.Length);
bw.WriteBytes(byteArray);
return;
}
// AcBinarySerializer needs the full payload size upfront (2-pass),
// so we serialize to a pooled byte[] first, then copy length-prefixed.
var serialized = AcBinarySerializer.Serialize(value, _options);
w.WriteVarUInt((uint)serialized.Length);
w.WriteBytes(serialized);
// Flush BWO to pipe, then serialize directly to the pipe via AcBinarySerializer
bw.FlushAndReset();
// Reserve arg length prefix directly on the pipe
var argLenSpan = output.GetSpan(LengthPrefixSize);
output.Advance(LengthPrefixSize);
var argBytes = AcBinarySerializer.Serialize(value, output, _options);
Unsafe.WriteUnaligned(ref argLenSpan[0], argBytes);
externalBytes += LengthPrefixSize + argBytes;
}
private object?[] ReadArguments(ref SpanReader r, IReadOnlyList<Type> paramTypes)
@ -423,70 +423,79 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
private object? ReadSingleArgument(ref SpanReader r, Type targetType)
{
var argLength = (int)r.ReadVarUInt();
var argLength = r.ReadInt32();
if (argLength == 0)
return null;
var argSpan = r.ReadSpan(argLength);
if (argLength == 1 && argSpan[0] == 0) // BinaryTypeCode.Null
if (argLength == 1 && argSpan[0] == 0)
return null;
// byte[] fast-path: bypass deserializer engine
if (targetType == typeof(byte[]) && argSpan.Length > 0 && argSpan[0] == BinaryTypeCode.ByteArray)
{
var byteReader = new SpanReader(argSpan.Slice(1));
var len = (int)byteReader.ReadVarUInt();
return byteReader.ReadSpan(len).ToArray();
}
return AcBinaryDeserializer.Deserialize(argSpan, targetType, _options);
}
#endregion
#region Framing Helpers (string, nullable string, string array, headers)
#region Framing Helpers
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WriteString(BufferWriterBinaryOutput w, string value)
{
w.WriteStringUtf8(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WriteNullableString(BufferWriterBinaryOutput w, string? value)
private static void WriteNullableString(ref BufferWriterBinaryOutput bw, string? value)
{
if (value == null)
{
w.WriteByte(0); // null marker
bw.WriteByte(0);
return;
}
w.WriteByte(1); // present marker
w.WriteStringUtf8(value);
bw.WriteByte(1);
bw.WriteStringUtf8(value);
}
private static void WriteStringArray(BufferWriterBinaryOutput w, string[]? array)
private static void WriteStringArray(ref BufferWriterBinaryOutput bw, string[]? array)
{
if (array == null || array.Length == 0)
{
w.WriteVarUInt(0);
bw.WriteVarUInt(0);
return;
}
w.WriteVarUInt((uint)array.Length);
bw.WriteVarUInt((uint)array.Length);
for (var i = 0; i < array.Length; i++)
w.WriteStringUtf8(array[i]);
bw.WriteStringUtf8(array[i]);
}
private static void WriteHeaders(BufferWriterBinaryOutput w, IDictionary<string, string>? headers)
private static void WriteHeaders(ref BufferWriterBinaryOutput bw, IDictionary<string, string>? headers)
{
if (headers == null || headers.Count == 0)
{
w.WriteVarUInt(0);
bw.WriteVarUInt(0);
return;
}
w.WriteVarUInt((uint)headers.Count);
bw.WriteVarUInt((uint)headers.Count);
foreach (var kv in headers)
{
w.WriteStringUtf8(kv.Key);
w.WriteStringUtf8(kv.Value);
bw.WriteStringUtf8(kv.Key);
bw.WriteStringUtf8(kv.Value);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int VarUIntSize(uint value)
{
if (value < 0x80) return 1;
if (value < 0x4000) return 2;
if (value < 0x200000) return 3;
if (value < 0x10000000) return 4;
return 5;
}
#endregion
#region Helpers
@ -552,6 +561,14 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte ReadByte() => _span[_pos++];
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int ReadInt32()
{
var value = Unsafe.ReadUnaligned<int>(ref Unsafe.AsRef(in _span[_pos]));
_pos += 4;
return value;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long ReadInt64()
{
@ -589,7 +606,7 @@ public sealed class AcBinaryHubProtocol : IHubProtocol
if (byteCount == 0)
return string.Empty;
var bytes = ReadSpan(byteCount);
return System.Text.Encoding.UTF8.GetString(bytes);
return Encoding.UTF8.GetString(bytes);
}
public string? ReadNullableString()

View File

@ -69,6 +69,11 @@ namespace AyCode.Services.SignalRs
HubConnection = null;
}
public virtual IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes)
{
throw new NotSupportedException("Client does not support serving streams to the server. Streams are established Server-to-Client only.");
}
private Task HubConnection_Closed(Exception? arg)
{
if (_responseByRequestId.IsEmpty) Logger.DebugConditional("Client HubConnection_Closed");
@ -187,6 +192,49 @@ namespace AyCode.Services.SignalRs
public virtual Task GetAllAsync<TResponseData>(int messageTag, Func<SignalResponseDataMessage, Task> responseCallback, object[]? contextParams)
=> SendMessageToServerAsync(messageTag, contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage<IdMessage>(new IdMessage(contextParams)), responseCallback);
public virtual async IAsyncEnumerable<TResponseData?> StreamAllAsync<TResponseData>(int messageTag, object[]? contextParams = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await StartConnection();
if (HubConnection == null || !IsConnected())
{
Logger.Error($"Client StreamAllAsync error! ConnectionState: {GetConnectionState()};");
yield break;
}
var message = contextParams == null || contextParams.Length == 0 ? null : new SignalPostJsonDataMessage<IdMessage>(new IdMessage(contextParams));
var msgBytes = message != null ? SignalRSerializationHelper.SerializeToBinary(message) : null;
var stream = HubConnection.StreamAsync<byte[]>(
nameof(IAcSignalRHubBase.OnReceiveStreamMessage),
messageTag,
msgBytes,
cancellationToken);
await foreach (var bytes in stream.WithCancellation(cancellationToken))
{
if (bytes == null) continue;
if (typeof(TResponseData) == typeof(byte[]))
{
yield return (TResponseData)(object)bytes;
continue;
}
var responseMessage = SignalRSerializationHelper.DeserializeFromBinary<SignalResponseDataMessage>(bytes);
if (responseMessage != null)
{
if (responseMessage.Status == SignalResponseStatus.Error)
{
var errorText = $"Client StreamAllAsync error; tag: {messageTag}; Status: {responseMessage.Status}";
Logger.Error(errorText);
throw new Exception(errorText);
}
yield return responseMessage.GetResponseData<TResponseData>();
}
}
}
public virtual Task<TPostData?> PostDataAsync<TPostData>(int messageTag, TPostData postData) where TPostData : class
=> SendMessageToServerAsync<TPostData>(messageTag, CreatePostMessage(postData), GetNextRequestId());
@ -217,6 +265,49 @@ namespace AyCode.Services.SignalRs
return SendMessageToServerAsync(messageTag, CreatePostMessage(postData), requestId);
}
public virtual async IAsyncEnumerable<TResponseData?> StreamPostDataAsync<TPostData, TResponseData>(int messageTag, TPostData postData, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await StartConnection();
if (HubConnection == null || !IsConnected())
{
Logger.Error($"Client StreamPostDataAsync error! ConnectionState: {GetConnectionState()};");
yield break;
}
var message = CreatePostMessage(postData);
var msgBytes = SignalRSerializationHelper.SerializeToBinary(message);
var stream = HubConnection.StreamAsync<byte[]>(
nameof(IAcSignalRHubBase.OnReceiveStreamMessage),
messageTag,
msgBytes,
cancellationToken);
await foreach (var bytes in stream.WithCancellation(cancellationToken))
{
if (bytes == null) continue;
if (typeof(TResponseData) == typeof(byte[]))
{
yield return (TResponseData)(object)bytes;
continue;
}
var responseMessage = SignalRSerializationHelper.DeserializeFromBinary<SignalResponseDataMessage>(bytes);
if (responseMessage != null)
{
if (responseMessage.Status == SignalResponseStatus.Error)
{
var errorText = $"Client StreamPostDataAsync error; tag: {messageTag}; Status: {responseMessage.Status}";
Logger.Error(errorText);
throw new Exception(errorText);
}
yield return responseMessage.GetResponseData<TResponseData>();
}
}
}
private static ISignalRMessage CreatePostMessage<TPostData>(TPostData postData)
{
var type = typeof(TPostData);

View File

@ -4,4 +4,6 @@ public interface IAcSignalRHubBase
{
//Task OnRequestMessage(int messageTag, int requestId);
Task OnReceiveMessage(int messageTag, byte[] messageBytes, int? requestId);
IAsyncEnumerable<byte[]> OnReceiveStreamMessage(int messageTag, byte[]? messageBytes);
}

View File

@ -7,7 +7,7 @@ Custom binary SignalR protocol, client infrastructure, message tagging, and seri
## Key Files
### Protocol
- **`AcBinaryHubProtocol.cs`** — Custom `IHubProtocol` replacing JSON+Base64 with `AcBinarySerializer`. Handles all 9 SignalR message types (Invocation, StreamItem, Completion, Ping, Close, etc.). Inner `SpanReader` ref struct for zero-alloc parsing.
- **`AcBinaryHubProtocol.cs`** — Custom `IHubProtocol` replacing JSON+Base64 with `AcBinarySerializer`. Handles all 9 SignalR message types (Invocation, StreamItem, Completion, Ping, Close, etc.). Uses `BufferWriterBinaryOutput` standalone mode for zero-copy writes to the SignalR pipe. `byte[]` fast-path bypasses the serializer entirely. Inner `SpanReader` ref struct for zero-alloc parsing.
### Client
- **`AcSignalRClientBase.cs`** — Abstract SignalR client managing `HubConnection`, request/response tracking via pooled `SignalRRequestModel`. Methods: `SendMessageToServerAsync<TResponse>()`, CRUD helpers (Post, Get, GetAll, GetAllInto). Configurable timeouts.

View File

@ -101,7 +101,13 @@ Custom `IHubProtocol` (name: `"acbinary"`), replaces default JSON. Frame format:
Message types: Invocation(1), StreamItem(2), Completion(3), Ping(6), Close(7), Ack(8), Sequence(9).
Arguments serialized individually with VarUInt length prefix. Direct write to `IBufferWriter` via `BufferWriterBinaryOutput`.
Arguments serialized individually with INT32 length prefix (patched in-place after payload is written).
**Zero-copy write pipeline:**
All writes go through a single `BufferWriterBinaryOutput` in standalone mode (cached chunk pattern, zero virtual dispatch). For argument payloads, the BWO flushes to the pipe via `FlushAndReset()`, then `AcBinarySerializer.Serialize()` writes directly to the `IBufferWriter` (pipe) — zero-copy, no intermediate `byte[]` allocation.
**Raw `byte[]` Fast-Path:**
When an argument is a pure `byte[]`, the size is known upfront. The protocol writes `BinaryTypeCode.ByteArray` (68) marker, `VarUInt` length, and raw bytes entirely through the `BufferWriterBinaryOutput`, completely skipping the `AcBinarySerializer` context, the two-phase scan, and the internal array pools.
### Response Message

View File

@ -1 +1,17 @@
CRITICAL: Your FIRST action in every session MUST be reading `.github/copilot-instructions.md`. Execute ALL session-start instructions found there before responding to any user query. It is the single source of truth for this repo.
## SEQUENTIAL EXECUTION OVERRIDE
The AI AGENT CORE PROTOCOL in copilot-instructions.md requires STRICT SEQUENTIAL execution. This OVERRIDES your default parallelization behavior. Do NOT parallelize doc reads with code searches. The sequence is:
1. Read copilot-instructions.md → process its rules FULLY
2. Read ALL docs/ .md files listed in the protocol → wait for completion
3. Output [LOADED_DOCS: ...] prefix
4. ONLY THEN respond to the user's query or search code
## Tool mapping for AI AGENT CORE PROTOCOL
The copilot-instructions.md references Copilot tool names. Map them to Claude Code tools:
- `get_file` / `file_search``Read`, `Glob`, `Grep`
- `code_search` / `get_symbols_by_name` / `find_symbol``Grep`, `Glob`
- `replace_string_in_file` / `edit_file``Edit`
- `create_file``Write`
Follow the protocol using YOUR tools. The rules (LOADED_DOCS prefix, hard-gate, no-re-read, context recovery, explicit consent) apply equally to Claude Code.

View File

@ -74,7 +74,7 @@ For full architecture see `AyCode.Services/docs/SIGNALR.md`.
| **Message Tag** | Integer identifier mapping to a method via `[SignalR(tag)]` or `[SignalRSendToClient(tag)]` attributes. |
| **DynamicMethodRegistry** | Resolves message tags to `MethodInfo` at runtime. Static `ConcurrentDictionary` cache with lazy scan on miss. |
| **SignalRCrudTags** | Sealed class bundling 5 independent tag integers (getAllTag, getItemTag, addTag, updateTag, removeTag) for entity CRUD. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`. |
| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. |
| **AcBinaryHubProtocol** | Custom `IHubProtocol` replacing SignalR's JSON+Base64 with `AcBinarySerializer`. Protocol name: `"acbinary"`. Uses `BufferWriterBinaryOutput` for zero-copy writes to the SignalR pipe. |
| **SignalResponseDataMessage** | Response message supporting Binary or JSON+GZip. Responses use pure Binary (no JSON overhead). |
| **SignalPostJsonDataMessage** | ⚠️ TECH DEBT — request params serialized to JSON inside Binary envelope. Planned for pure Binary replacement. |
| **AcSignalRDataSource** | Generic real-time `IList<T>` with change tracking, CRUD via SignalRCrudTags, binary merge, rollback, sync state. |