Refactor SignalR protocol for zero-copy, typed deserialization
- Change OnReceiveMessage signature to use `object data` (was `SignalData`), enabling type-aware and raw byte[] payloads. - Implement three-path argument deserialization: byte[] fast-path, IsRawBytesData, and eager typed deserialization via SignalDataType. - Add SignalDataType and IsRawBytesData fields to SignalParams for protocol guidance. - Write path now uses AcBinarySerializer zero-copy to pipe; byte[] uses fast-path. - SequenceBinaryInput now dynamically sizes scratch buffer for large cross-segment reads. - Deserializer now advances segments before throwing end-of-buffer, improving multi-segment support. - Set client logging to Debug for better diagnostics. - Update all docs and markdown to reflect new protocol, dispatch model, and field semantics. - AyCodeBinaryHubProtocol is now an empty derived class for registration/future hooks; SignalData is no longer the primary payload type. - SignalResponseDataMessage is now an internal DTO with RawResponseData as object? (typed or byte[]), and GetResponseData<T>() is a direct cast.
This commit is contained in:
parent
05808d0d13
commit
91194fcfa3
|
|
@ -28,7 +28,13 @@ public static partial class AcBinaryDeserializer
|
||||||
public bool IsAtEnd
|
public bool IsAtEnd
|
||||||
{
|
{
|
||||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||||
get => _position >= _bufferLength;
|
get
|
||||||
|
{
|
||||||
|
if (_position < _bufferLength) return false;
|
||||||
|
// Multi-segment: try advancing to next segment before declaring end.
|
||||||
|
// ArrayBinaryInput: TryAdvanceSegment => false (JIT eliminates, same as before).
|
||||||
|
return !Input.TryAdvanceSegment(ref _buffer, ref _position, ref _bufferLength, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int Position
|
public int Position
|
||||||
|
|
@ -44,6 +50,9 @@ public static partial class AcBinaryDeserializer
|
||||||
{
|
{
|
||||||
if (_position >= _bufferLength)
|
if (_position >= _bufferLength)
|
||||||
{
|
{
|
||||||
|
// Multi-segment: try advancing to next segment before giving up.
|
||||||
|
// ArrayBinaryInput: TryAdvanceSegment => false (JIT eliminates this branch).
|
||||||
|
if (!Input.TryAdvanceSegment(ref _buffer, ref _position, ref _bufferLength, 1))
|
||||||
throw new AcBinaryDeserializationException("Unexpected end of binary payload.", _position);
|
throw new AcBinaryDeserializationException("Unexpected end of binary payload.", _position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -55,6 +64,8 @@ public static partial class AcBinaryDeserializer
|
||||||
{
|
{
|
||||||
if (_position >= _bufferLength)
|
if (_position >= _bufferLength)
|
||||||
{
|
{
|
||||||
|
// Multi-segment: try advancing to next segment before giving up.
|
||||||
|
if (!Input.TryAdvanceSegment(ref _buffer, ref _position, ref _bufferLength, 1))
|
||||||
throw new AcBinaryDeserializationException("Unexpected end of binary payload.", _position);
|
throw new AcBinaryDeserializationException("Unexpected end of binary payload.", _position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -210,6 +221,14 @@ public static partial class AcBinaryDeserializer
|
||||||
{
|
{
|
||||||
//if (FastWire) { return ReadRaw<uint>(); }
|
//if (FastWire) { return ReadRaw<uint>(); }
|
||||||
|
|
||||||
|
// Multi-segment safety: ensure at least 1 byte before direct buffer access.
|
||||||
|
// ArrayBinaryInput: TryAdvanceSegment => false (JIT eliminates this branch).
|
||||||
|
if (_position >= _bufferLength)
|
||||||
|
{
|
||||||
|
if (!Input.TryAdvanceSegment(ref _buffer, ref _position, ref _bufferLength, 1))
|
||||||
|
throw new AcBinaryDeserializationException("Unexpected end of binary payload.", _position);
|
||||||
|
}
|
||||||
|
|
||||||
// Fast path: single byte (0-127) - ~70% of cases
|
// Fast path: single byte (0-127) - ~70% of cases
|
||||||
var b0 = _buffer[_position];
|
var b0 = _buffer[_position];
|
||||||
if ((b0 & 0x80) == 0)
|
if ((b0 & 0x80) == 0)
|
||||||
|
|
@ -229,7 +248,7 @@ public static partial class AcBinaryDeserializer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: 3+ bytes - ~5% of cases
|
// Slow path: 3+ bytes or cross-segment boundary — uses ReadByte() per byte
|
||||||
return ReadVarUIntSlow();
|
return ReadVarUIntSlow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ namespace AyCode.Core.Serializers.Binaries;
|
||||||
/// Processes segments one-by-one without linearizing the entire payload.
|
/// Processes segments one-by-one without linearizing the entire payload.
|
||||||
///
|
///
|
||||||
/// For values that span segment boundaries (e.g. a 4-byte int split across 2 segments),
|
/// For values that span segment boundaries (e.g. a 4-byte int split across 2 segments),
|
||||||
/// copies the overlapping bytes into a small scratch buffer and reads from there.
|
/// copies the overlapping bytes into a scratch buffer and reads from there.
|
||||||
///
|
///
|
||||||
/// Mirrors BufferWriterBinaryOutput pattern from the serializer side.
|
/// Mirrors BufferWriterBinaryOutput pattern from the serializer side.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|
@ -21,9 +21,8 @@ public struct SequenceBinaryInput : IBinaryInputBase
|
||||||
private readonly ArraySegment<byte>[] _segments;
|
private readonly ArraySegment<byte>[] _segments;
|
||||||
private int _currentSegment;
|
private int _currentSegment;
|
||||||
|
|
||||||
// Scratch buffer for cross-boundary reads (max 16 bytes for Guid/Decimal)
|
// Scratch buffer for cross-boundary reads — dynamically sized for large reads (strings, byte arrays)
|
||||||
private byte[]? _scratchBuffer;
|
private byte[]? _scratchBuffer;
|
||||||
private int _scratchLength;
|
|
||||||
|
|
||||||
// After a cross-boundary read, the next TryAdvanceSegment must load
|
// After a cross-boundary read, the next TryAdvanceSegment must load
|
||||||
// the remainder of _currentSegment (already adjusted) without incrementing.
|
// the remainder of _currentSegment (already adjusted) without incrementing.
|
||||||
|
|
@ -58,7 +57,6 @@ public struct SequenceBinaryInput : IBinaryInputBase
|
||||||
|
|
||||||
_currentSegment = 0;
|
_currentSegment = 0;
|
||||||
_scratchBuffer = null;
|
_scratchBuffer = null;
|
||||||
_scratchLength = 0;
|
|
||||||
_afterCrossBoundary = false;
|
_afterCrossBoundary = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -128,23 +126,24 @@ public struct SequenceBinaryInput : IBinaryInputBase
|
||||||
if (_currentSegment >= _segments.Length)
|
if (_currentSegment >= _segments.Length)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Ensure scratch buffer is large enough (max 16 bytes for Guid/Decimal)
|
var nextSeg = _segments[_currentSegment];
|
||||||
_scratchBuffer ??= new byte[32];
|
var fromNext = Math.Min(needed - remaining, nextSeg.Count);
|
||||||
|
var scratchNeeded = remaining + fromNext;
|
||||||
|
|
||||||
|
// Dynamically size scratch buffer — handles large reads (strings, byte arrays)
|
||||||
|
if (_scratchBuffer == null || _scratchBuffer.Length < scratchNeeded)
|
||||||
|
_scratchBuffer = new byte[Math.Max(32, scratchNeeded)];
|
||||||
|
|
||||||
// Copy tail of current segment
|
// Copy tail of current segment
|
||||||
Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining);
|
Buffer.BlockCopy(buffer, position, _scratchBuffer, 0, remaining);
|
||||||
|
|
||||||
// Copy head of next segment
|
// Copy head of next segment
|
||||||
var nextSeg = _segments[_currentSegment];
|
|
||||||
var fromNext = Math.Min(needed - remaining, nextSeg.Count);
|
|
||||||
Buffer.BlockCopy(nextSeg.Array!, nextSeg.Offset, _scratchBuffer, remaining, fromNext);
|
Buffer.BlockCopy(nextSeg.Array!, nextSeg.Offset, _scratchBuffer, remaining, fromNext);
|
||||||
|
|
||||||
_scratchLength = remaining + fromNext;
|
|
||||||
|
|
||||||
// Set up context to read from scratch buffer
|
// Set up context to read from scratch buffer
|
||||||
buffer = _scratchBuffer;
|
buffer = _scratchBuffer;
|
||||||
position = 0;
|
position = 0;
|
||||||
bufferLength = _scratchLength;
|
bufferLength = scratchNeeded;
|
||||||
|
|
||||||
// Adjust the current segment to skip the bytes we already copied.
|
// Adjust the current segment to skip the bytes we already copied.
|
||||||
// The _afterCrossBoundary flag ensures the next TryAdvanceSegment
|
// The _afterCrossBoundary flag ensures the next TryAdvanceSegment
|
||||||
|
|
|
||||||
|
|
@ -73,8 +73,10 @@ await dataSource.LoadDataSourceFromResponseData(responseData, serializerType);
|
||||||
await dataSource.LoadItem(id); // single item by ID
|
await dataSource.LoadItem(id); // single item by ID
|
||||||
```
|
```
|
||||||
|
|
||||||
**Binary deserialization paths:**
|
**Deserialization paths:**
|
||||||
- `AcObservableCollection<T>`: `BeginUpdate()` → `BinaryToMerge()` → `EndUpdate()` — single batched UI notification.
|
- **Typed response** (`T != byte[]`): protocol eagerly deserializes via `SignalDataType` → `GetResponseData<T>()` direct cast.
|
||||||
|
- **Raw byte[] response** (`IsRawBytesData`): protocol returns raw `byte[]` → consumer deserializes:
|
||||||
|
- `AcObservableCollection<T>`: `BeginUpdate()` → `PopulateMerge(bytes)` → `EndUpdate()` — single batched UI notification.
|
||||||
- `List<T>`: `BinaryTo(InnerList)` — direct populate.
|
- `List<T>`: `BinaryTo(InnerList)` — direct populate.
|
||||||
|
|
||||||
**Context/Filtering:** `ContextIds` (object[]) and `FilterText` (string) are sent with every GetAll request for server-side filtering.
|
**Context/Filtering:** `ContextIds` (object[]) and `FilterText` (string) are sent with every GetAll request for server-side filtering.
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ Server-side SignalR hub infrastructure: method dispatch, session management, bro
|
||||||
## Server Processing
|
## Server Processing
|
||||||
|
|
||||||
```
|
```
|
||||||
6. OnReceiveMessage(tag, requestId, signalParams, SignalData data)
|
6. OnReceiveMessage(tag, requestId, signalParams, object data)
|
||||||
7. Extract parameterBytes from signalParams.Parameters
|
7. Extract parameterBytes from signalParams.Parameters
|
||||||
8. DynamicMethodRegistry.GetMethodByMessageTag(tag) <- ConcurrentDictionary lookup
|
8. DynamicMethodRegistry.GetMethodByMessageTag(tag) <- ConcurrentDictionary lookup
|
||||||
9. signalParams.GetParameterValues(paramInfos):
|
9. signalParams.GetParameterValues(paramInfos):
|
||||||
|
|
@ -18,11 +18,13 @@ Server-side SignalR hub infrastructure: method dispatch, session management, bro
|
||||||
|- Hub validates: missing required params throw ArgumentException
|
|- Hub validates: missing required params throw ArgumentException
|
||||||
'- NOTE: BinaryTo only -- JSON param deserialization not supported (needs JsonTo + project ref)
|
'- NOTE: BinaryTo only -- JSON param deserialization not supported (needs JsonTo + project ref)
|
||||||
10. MethodInfo.InvokeMethod(instance, params) <- unwraps Task/ValueTask
|
10. MethodInfo.InvokeMethod(instance, params) <- unwraps Task/ValueTask
|
||||||
11. CreateResponseMessage(tag, Success, result) <- Binary serialize payload -> byte[]
|
11. ResponseToCaller(tag, Success, responseData, requestId, signalParams):
|
||||||
12. SendMessageToClient(caller, tag, message, requestId):
|
12. SendMessageToClient(caller, tag, status, responseData, requestId, clientSignalParams):
|
||||||
|- Extract signalParams { Status, DataSerializerType } + SignalData from message
|
|- Build response SignalParams { Status, DataSerializerType, SignalDataType, IsRawBytesData }
|
||||||
'- caller.OnReceiveMessage(tag, requestId, signalParams, SignalData)
|
|- SignalDataType = responseData?.GetType().AssemblyQualifiedName (null if IsRawBytesData)
|
||||||
(metadata + payload as separate args -- no envelope serialization)
|
|- IsRawBytesData forwarded from client's SignalParams
|
||||||
|
'- caller.OnReceiveMessage(tag, requestId, signalParams, responseData)
|
||||||
|
Protocol zero-copy serializes responseData directly to pipe (no intermediate byte[])
|
||||||
13. If SendToOtherClientType != None:
|
13. If SendToOtherClientType != None:
|
||||||
'- SendMessageToOthers(sendToOtherClientTag, result) <- uses sendToOtherClientTag, not messageTag
|
'- SendMessageToOthers(sendToOtherClientTag, result) <- uses sendToOtherClientTag, not messageTag
|
||||||
```
|
```
|
||||||
|
|
@ -34,7 +36,7 @@ See also: `AyCode.Models.Server/DynamicMethods/README.md`
|
||||||
### Server-Side Lookup
|
### Server-Side Lookup
|
||||||
|
|
||||||
```
|
```
|
||||||
1. OnReceiveMessage(tag=100, requestId, signalParams, SignalData data)
|
1. OnReceiveMessage(tag=100, requestId, signalParams, object data)
|
||||||
|
|
||||||
2. DynamicMethodRegistry.GetMethodByMessageTag(100)
|
2. DynamicMethodRegistry.GetMethodByMessageTag(100)
|
||||||
|- Check static ConcurrentDictionary<int, (Type, AcMethodInfoModel)?> cache
|
|- Check static ConcurrentDictionary<int, (Type, AcMethodInfoModel)?> cache
|
||||||
|
|
@ -83,7 +85,7 @@ ConcurrentDictionary<TSessionItemId, TSessionItem> Sessions
|
||||||
| `SendMessageToUser(userId)` | User (all connections) |
|
| `SendMessageToUser(userId)` | User (all connections) |
|
||||||
| `SendMessageToUsers(userIds)` | Multiple users |
|
| `SendMessageToUsers(userIds)` | Multiple users |
|
||||||
|
|
||||||
All messages serialized to `SignalData` payload + `SignalParams` metadata (Parameters=null for server->client push) -> sent as separate hub arguments via `OnReceiveMessage` (no envelope wrapping). Server wraps `byte[]` in non-pooled `SignalData`; client receives as `ArrayPool`-backed `SignalData` via `AyCodeBinaryHubProtocol`.
|
All messages use the same `SendMessageToClient` path: build `SignalParams` (Status, DataSerializerType, SignalDataType) + pass `object responseData` as separate hub argument. Protocol zero-copy serializes `responseData` directly to the pipe.
|
||||||
|
|
||||||
## Hub Events
|
## Hub Events
|
||||||
|
|
||||||
|
|
@ -96,8 +98,6 @@ Enable with `AcWebSignalRHubBase.EnableBinaryDiagnostics = true`.
|
||||||
|
|
||||||
Logs: hex dump (500 byte sample), header parsing (version, marker), property count + names via VarUInt reading.
|
Logs: hex dump (500 byte sample), header parsing (version, marker), property count + names via VarUInt reading.
|
||||||
|
|
||||||
`SignalResponseDataMessage.DiagnosticLogger` -- per-response logging: target type info, property list, inheritance chain, hex dump. Uses `SignalData.Span` for zero-alloc diagnostics.
|
|
||||||
|
|
||||||
## Key Source Files
|
## Key Source Files
|
||||||
|
|
||||||
| Component | Path |
|
| Component | Path |
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ namespace AyCode.Services.SignalRs
|
||||||
.ConfigureLogging(logging =>
|
.ConfigureLogging(logging =>
|
||||||
{
|
{
|
||||||
// alap minimális MS log level
|
// alap minimális MS log level
|
||||||
logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Error);
|
logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Debug);
|
||||||
|
|
||||||
// regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt
|
// regisztráljuk az AcLoggerProvider-t úgy, hogy visszaadja a meglévő Logger példányt
|
||||||
logging.AddAcLogger(_ => Logger);
|
logging.AddAcLogger(_ => Logger);
|
||||||
|
|
|
||||||
File diff suppressed because one or more lines are too long
|
|
@ -15,7 +15,7 @@ Client ◄──OnReceiveMessage(tag, requestId, signalParams, data)── Serve
|
||||||
```
|
```
|
||||||
|
|
||||||
Tag (int) determines server method. All calls go through `OnReceiveMessage`.
|
Tag (int) determines server method. All calls go through `OnReceiveMessage`.
|
||||||
Metadata (`SignalParams`) and payload (`SignalData`) travel as **separate hub arguments** — `SignalData` wraps pooled `byte[]` from `ArrayPool` via `AyCodeBinaryHubProtocol` (zero-copy fast-path), metadata is AcBinary serialized normally.
|
Metadata (`SignalParams`) and payload (`object data`) travel as **separate hub arguments** — `SignalParams` is AcBinary serialized normally, `data` is serialized directly to the pipe via `AcBinarySerializer` (zero-copy write) or passed through as `byte[]` via protocol fast-path.
|
||||||
|
|
||||||
```
|
```
|
||||||
Client: Server:
|
Client: Server:
|
||||||
|
|
@ -70,9 +70,9 @@ CRUD helpers (`PostAsync`, `GetByIdAsync`, `GetAllAsync`, `PostDataAsync`) are g
|
||||||
|
|
||||||
### AcBinaryHubProtocol / AyCodeBinaryHubProtocol
|
### AcBinaryHubProtocol / AyCodeBinaryHubProtocol
|
||||||
|
|
||||||
Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy via `BufferWriterBinaryOutput` standalone mode. `byte[]` and `SignalData` args bypass serializer.
|
Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy write via `BufferWriterBinaryOutput` standalone mode + `AcBinarySerializer.Serialize(value, output)` directly to pipe. Zero-copy read via `SequenceReader<byte>` from pipe's `ReadOnlySequence`.
|
||||||
|
|
||||||
`AcBinaryHubProtocol` is the base (unsealed, generic). `AyCodeBinaryHubProtocol` derives from it and uses `ArrayPool` for `SignalData` arguments — the `CreateByteArrayResult` hook rents from pool instead of `.ToArray()`. Register `AyCodeBinaryHubProtocol` in both client and server.
|
`AcBinaryHubProtocol` is the base (unsealed, generic). `AyCodeBinaryHubProtocol` derives from it (currently empty — exists for registration and future project-specific hooks). Register `AyCodeBinaryHubProtocol` in both client and server.
|
||||||
|
|
||||||
> Wire format, argument framing, dual BWO pattern, length prefix patching: `SIGNALR_BINARY_PROTOCOL.md`
|
> Wire format, argument framing, dual BWO pattern, length prefix patching: `SIGNALR_BINARY_PROTOCOL.md`
|
||||||
|
|
||||||
|
|
@ -85,26 +85,29 @@ Custom `IHubProtocol` (`"acbinary"`), replaces JSON. Zero-copy via `BufferWriter
|
||||||
| `Status` | SignalResponseStatus | Success/Error |
|
| `Status` | SignalResponseStatus | Success/Error |
|
||||||
| `DataSerializerType` | AcSerializerType | Binary or JsonGZip — tells client how to deserialize response data |
|
| `DataSerializerType` | AcSerializerType | Binary or JsonGZip — tells client how to deserialize response data |
|
||||||
| `Parameters` | `byte[]?` | Serialized `byte[][]` as single `byte[]` (protocol fast-path). Null when no parameters. |
|
| `Parameters` | `byte[]?` | Serialized `byte[][]` as single `byte[]` (protocol fast-path). Null when no parameters. |
|
||||||
|
| `SignalDataType` | `string?` | `AssemblyQualifiedName` of response object type. Server sets before sending. Protocol uses this for eager type-aware deserialization. Null for raw byte[] responses. |
|
||||||
|
| `IsRawBytesData` | `bool` | Client sets true when `T == byte[]` (e.g. DataSource populate/merge). Protocol returns raw byte[] without deserialization. |
|
||||||
|
|
||||||
Typed access via methods (PostDataJson pattern):
|
Typed access via methods (PostDataJson pattern):
|
||||||
- **Client**: `SetParameterValues(object[])` — packs each param via `ToBinary()` → `byte[][]` → `byte[]`
|
- **Client**: `SetParameterValues(object[])` — packs each param via `ToBinary()` → `byte[][]` → `byte[]`
|
||||||
- **Server**: `GetParameterValues(ParameterInfo[])` — unpacks `byte[]` → `byte[][]` → per-element `BinaryTo(targetType)`
|
- **Server**: `GetParameterValues(ParameterInfo[])` — unpacks `byte[]` → `byte[][]` → per-element `BinaryTo(targetType)`
|
||||||
- Protocol never sees `byte[][]` — only `byte[]`.
|
- Protocol never sees `byte[][]` — only `byte[]`.
|
||||||
|
|
||||||
`SignalData data` (separate hub argument, protocol fast-path, ArrayPool-backed via `AyCodeBinaryHubProtocol`).
|
`object data` (4th hub argument) — protocol handles three cases on read:
|
||||||
|
1. **byte[] fast-path**: first byte is `BinaryTypeCode.ByteArray(0x44)` → strip tag + VarUInt length → return raw payload bytes. No deserializer.
|
||||||
`SignalData` wraps pooled `byte[]` with `IDisposable` lifecycle. Consumer accesses via `Span` (zero-copy) or `ToArray()` (copy, rare). `Dispose()` returns rented buffer to `ArrayPool` with `clearArray: true`.
|
2. **IsRawBytesData**: `SignalParams.IsRawBytesData == true` → return entire argSlice as raw `byte[]`. No deserialization. Consumer handles deserialization.
|
||||||
|
3. **Typed deserialization**: resolve type from `SignalParams.SignalDataType` → `AcBinaryDeserializer.Deserialize(sequence, type)` → return typed object.
|
||||||
|
|
||||||
`Parameters` and `data` are **independent** — both can be null or filled in any direction (SignalR is bidirectional).
|
`Parameters` and `data` are **independent** — both can be null or filled in any direction (SignalR is bidirectional).
|
||||||
|
|
||||||
| Combination | Parameters | data | Example |
|
| Combination | Parameters | data | Example |
|
||||||
|------------|-----------|------|---------|
|
|------------|-----------|------|---------|
|
||||||
| Request | `byte[]` (packed params) | null/empty | client calls server method |
|
| Request | `byte[]` (packed params) | null/empty | client calls server method |
|
||||||
| Response | null | SignalData (response payload) | server returns result |
|
| Response | null | typed object or byte[] | server returns result |
|
||||||
| Request + data | `byte[]` | SignalData | client responds to server with data |
|
| Request + data | `byte[]` | typed object | client responds to server with data |
|
||||||
| Signal | null | null/empty | ping, status change, broadcast trigger |
|
| Signal | null | null/empty | ping, status change, broadcast trigger |
|
||||||
|
|
||||||
`SignalResponseDataMessage` remains as **internal DTO** for callback routing — constructed in-memory from `signalParams` + `data`, never serialized as envelope on wire. `ResponseData` is `SignalData?`. `GetResponseData<T>()` dispatches on `DataSerializerType`: Binary → `AcBinaryDeserializer.Deserialize<T>(Span)`, JsonGZip → decompress → `JsonTo<T>()`. `Dispose()` returns both SignalData and JSON decompression buffers to ArrayPool.
|
`SignalResponseDataMessage` is an **internal DTO** for client-side callback routing and stream wire format — constructed in-memory from `signalParams` + `data`, never serialized as envelope on wire. `RawResponseData` is `object?` (typed object or byte[]). `GetResponseData<T>()` performs direct cast.
|
||||||
|
|
||||||
## Request/Response Flow
|
## Request/Response Flow
|
||||||
|
|
||||||
|
|
@ -114,23 +117,23 @@ Typed access via methods (PostDataJson pattern):
|
||||||
1. PostAsync<T>(tag, param) / PostAsync<T>(tag, params[]) / PostDataAsync(tag, data, callback)
|
1. PostAsync<T>(tag, param) / PostAsync<T>(tag, params[]) / PostDataAsync(tag, data, callback)
|
||||||
2. signalParams.SetParameterValues(object[]):
|
2. signalParams.SetParameterValues(object[]):
|
||||||
Each param ToBinary() → byte[][] → ToBinary() → byte[] (single wire blob)
|
Each param ToBinary() → byte[][] → ToBinary() → byte[] (single wire blob)
|
||||||
3. SignalParams { Status = Success, Parameters = byte[] }
|
3. SignalParams { Status = Success, Parameters = byte[], IsRawBytesData = (typeof(T) == typeof(byte[])) }
|
||||||
4. HubConnection.SendAsync("OnReceiveMessage", tag, requestId, signalParams, null)
|
4. SendCoreAsync → HubConnection.SendAsync("OnReceiveMessage", tag, requestId, signalParams, null)
|
||||||
5. AyCodeBinaryHubProtocol frames on wire (signalParams via AcBinary, data = null for requests)
|
5. AyCodeBinaryHubProtocol frames on wire (signalParams via AcBinary, data = null for requests)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Server → Client
|
### Server → Client
|
||||||
|
|
||||||
```
|
```
|
||||||
OnReceiveMessage(tag, requestId, signalParams, data)
|
OnReceiveMessage(tag, requestId, signalParams, object data)
|
||||||
├─ Construct SignalResponseDataMessage in-memory (no envelope deser):
|
├─ Construct SignalResponseDataMessage in-memory:
|
||||||
│ └─ { Status, DataSerializerType, ResponseData (SignalData) } from signalParams + data
|
│ └─ { MessageTag, Status, DataSerializerType, RawResponseData = data }
|
||||||
├─ Matching requestId in pending dict:
|
├─ Matching requestId in pending dict:
|
||||||
│ ├─ Route: null→sync wait, Action→invoke, Func<Task>→await
|
│ ├─ Route: null→sync wait, Action→invoke, Func<Task>→await
|
||||||
│ └─ GetResponseData<T>(): dispatches on DataSerializerType
|
│ └─ GetResponseData<T>(): direct cast (T)RawResponseData
|
||||||
│ Binary→Deserialize<T>(Span), JsonGZip→Decompress→JsonTo<T>()
|
│ Protocol already deserialized to correct type via SignalDataType
|
||||||
└─ No match (broadcast):
|
└─ No match (broadcast):
|
||||||
└─ abstract MessageReceived(tag, signalParams, SignalData data).Forget()
|
└─ abstract MessageReceived(tag, signalParams, object data).Forget()
|
||||||
```
|
```
|
||||||
|
|
||||||
Request pooling: `SignalRRequestModel` via `SignalRRequestModelPool` (ObjectPool + IResettable).
|
Request pooling: `SignalRRequestModel` via `SignalRRequestModelPool` (ObjectPool + IResettable).
|
||||||
|
|
@ -149,7 +152,7 @@ GetParameterValues(ParameterInfo[]):
|
||||||
|
|
||||||
Type-guided deserialization — each parameter is individually serialized/deserialized with its concrete type, avoiding the `object[]` → dictionary problem of untyped binary deserialization.
|
Type-guided deserialization — each parameter is individually serialized/deserialized with its concrete type, avoiding the `object[]` → dictionary problem of untyped binary deserialization.
|
||||||
|
|
||||||
**Perf concern:** Per-parameter `ToBinary()`/`BinaryTo(Type)` = N× context pool acquire/release + N× type-dispatch (ThreadLocal + ConcurrentDictionary cache). For many small primitives (int, bool, string) the per-call overhead may exceed a single bulk serialization. Complex objects benefit clearly. If benchmarks show regression vs old JSON path, a batch fast-path (single serialization context for all params) should be added.
|
**Perf concern:** Per-parameter `ToBinary()`/`BinaryTo(Type)` = N× context pool acquire/release + N× type-dispatch (ThreadLocal + ConcurrentDictionary cache). For many small primitives (int, bool, string) the per-call overhead may exceed a single bulk serialization call. Complex objects benefit clearly. If benchmarks show regression vs old JSON path, a batch fast-path (single serialization context for all params) should be added.
|
||||||
|
|
||||||
**Limitation:** Parameter serialization/deserialization is currently AcBinary only (`ToBinary()`/`BinaryTo()`). JSON support would require dispatching on serializer type in `SignalParams` methods + AcJsonSerializer reference.
|
**Limitation:** Parameter serialization/deserialization is currently AcBinary only (`ToBinary()`/`BinaryTo()`). JSON support would require dispatching on serializer type in `SignalParams` methods + AcJsonSerializer reference.
|
||||||
|
|
||||||
|
|
@ -179,7 +182,6 @@ Type-guided deserialization — each parameter is individually serialized/deseri
|
||||||
| Client base | `SignalRs/AcSignalRClientBase.cs` |
|
| Client base | `SignalRs/AcSignalRClientBase.cs` |
|
||||||
| Binary protocol (base) | `SignalRs/AcBinaryHubProtocol.cs` |
|
| Binary protocol (base) | `SignalRs/AcBinaryHubProtocol.cs` |
|
||||||
| Binary protocol (derived) | `SignalRs/AyCodeBinaryHubProtocol.cs` |
|
| Binary protocol (derived) | `SignalRs/AyCodeBinaryHubProtocol.cs` |
|
||||||
| Signal data wrapper | `SignalRs/SignalData.cs` |
|
|
||||||
| Tag attributes | `SignalRs/SignalMessageTagAttribute.cs` |
|
| Tag attributes | `SignalRs/SignalMessageTagAttribute.cs` |
|
||||||
| Base tags | `SignalRs/AcSignalRTags.cs` |
|
| Base tags | `SignalRs/AcSignalRTags.cs` |
|
||||||
| CRUD tags | `SignalRs/SignalRCrudTags.cs` |
|
| CRUD tags | `SignalRs/SignalRCrudTags.cs` |
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# SignalR Binary Protocol
|
# SignalR Binary Protocol
|
||||||
|
|
||||||
`AcBinaryHubProtocol` (unsealed base) — custom `IHubProtocol` (name: `"acbinary"`) replacing SignalR JSON+Base64 with `AcBinarySerializer`. `AyCodeBinaryHubProtocol` (derived) adds `ArrayPool`-backed `SignalData` creation via `CreateByteArrayResult` hook.
|
`AcBinaryHubProtocol` (unsealed base) — custom `IHubProtocol` (name: `"acbinary"`) replacing SignalR JSON+Base64 with `AcBinarySerializer`. `AyCodeBinaryHubProtocol` (derived, currently empty) exists for registration and future project-specific hooks.
|
||||||
|
|
||||||
> Architecture (tag system, dispatch, request/response): `SIGNALR.md`
|
> Architecture (tag system, dispatch, request/response): `SIGNALR.md`
|
||||||
> Output writers (cached chunk, buffer states, chunk sizing): `AyCode.Core/docs/BINARY_WRITERS.md`
|
> Output writers (cached chunk, buffer states, chunk sizing): `AyCode.Core/docs/BINARY_WRITERS.md`
|
||||||
|
|
@ -41,7 +41,7 @@ WriteMessage(HubMessage, IBufferWriter<byte> output)
|
||||||
│ ├─ WriteStringUtf8(invocationId, target)
|
│ ├─ WriteStringUtf8(invocationId, target)
|
||||||
│ ├─ WriteVarUInt(argCount)
|
│ ├─ WriteVarUInt(argCount)
|
||||||
│ ├─ Per argument:
|
│ ├─ Per argument:
|
||||||
│ │ ├─ byte[] → write through BWO (size known, no patching)
|
│ │ ├─ byte[] → byte[] fast-path through BWO (size known, no patching)
|
||||||
│ │ └─ object → FlushAndReset() → reserve INT32 arg prefix
|
│ │ └─ object → FlushAndReset() → reserve INT32 arg prefix
|
||||||
│ │ → AcBinarySerializer.Serialize(value, output) → patch prefix
|
│ │ → AcBinarySerializer.Serialize(value, output) → patch prefix
|
||||||
│ ├─ WriteStringArray(streamIds)
|
│ ├─ WriteStringArray(streamIds)
|
||||||
|
|
@ -77,26 +77,78 @@ Safe for `PipeWriter` — segments writable until `FlushAsync`.
|
||||||
|
|
||||||
**`GetMessageBytes` caveat:** `ArrayBufferWriter` initial capacity must include `LengthPrefixSize` to prevent resize after prefix reservation (stale span).
|
**`GetMessageBytes` caveat:** `ArrayBufferWriter` initial capacity must include `LengthPrefixSize` to prevent resize after prefix reservation (stale span).
|
||||||
|
|
||||||
## byte[] Fast-Path
|
## Write: byte[] Fast-Path
|
||||||
|
|
||||||
When argument is `byte[]`, bypasses serializer:
|
When argument is `byte[]`, bypasses serializer entirely — writes through BWO with known size:
|
||||||
1. Size upfront: `1 (BinaryTypeCode) + VarUIntSize(length) + length`
|
|
||||||
2. INT32 prefix written with actual value (no patching)
|
|
||||||
3. `BinaryTypeCode.ByteArray(68)` + VarUInt length + raw bytes via BWO
|
|
||||||
|
|
||||||
Read side mirrors: if first byte is `ByteArray(0x44)`, deserializer bypassed → direct `SpanReader` → `CreateByteArrayResult(span, targetType)`. Base returns `data.ToArray()`. `AyCodeBinaryHubProtocol` overrides: if `targetType == typeof(SignalData)`, rents from `ArrayPool` and returns `SignalData(rented, length, isRented: true)`. Detection is **wire-format only** (no targetType check for the marker) — ByteArray marker is unambiguous since no AcBinary object starts with 0x44 (version=1).
|
```
|
||||||
|
WriteArgument(byte[] value):
|
||||||
|
argPayload = 1 (BinaryTypeCode) + VarUIntSize(length) + length
|
||||||
|
Write INT32 argPayload (no patching needed — size known upfront)
|
||||||
|
Write BinaryTypeCode.ByteArray (0x44)
|
||||||
|
Write VarUInt length
|
||||||
|
Write raw bytes via BWO
|
||||||
|
```
|
||||||
|
|
||||||
Write side: `WriteArgument` handles both `byte[]` and `SignalData` via the same ByteArray wire format. `SignalData.Span` is written directly — same marker + VarUInt length + raw bytes.
|
## Write: Object Zero-Copy Path
|
||||||
|
|
||||||
## Read Path
|
When argument is any other object, serializes directly to the pipe (zero-copy):
|
||||||
|
|
||||||
`SpanReader` — `ref struct` for sequential `ReadOnlySpan<byte>` reading:
|
```
|
||||||
|
WriteArgument(object value):
|
||||||
|
FlushAndReset() BWO — hand pipe to serializer
|
||||||
|
Reserve INT32 arg length prefix on pipe
|
||||||
|
AcBinarySerializer.Serialize(value, output, options) — writes directly to pipe
|
||||||
|
Patch arg length prefix with actual bytes written
|
||||||
|
```
|
||||||
|
|
||||||
1. Read INT32 length. If `input.Length < total` → false (incomplete).
|
No intermediate `byte[]` — serializer writes to the pipe's `IBufferWriter` segments.
|
||||||
2. Multi-segment `ReadOnlySequence` → rent contiguous buffer from `ArrayPool`.
|
|
||||||
3. Parse message type → type-specific parser.
|
## Read: Three-Path Argument Deserialization
|
||||||
4. Fields via `SpanReader` methods (`ReadByte`, `ReadString`, `ReadVarUInt`, `ReadInt32`, `ReadInt64`, `ReadSpan`).
|
|
||||||
5. Arguments: INT32 length → slice → `AcBinaryDeserializer.Deserialize(span, targetType)`.
|
`ReadSingleArgument` reads `[INT32 argLength] [argBytes]` from the pipe's `ReadOnlySequence` via `SequenceReader<byte>`:
|
||||||
|
|
||||||
|
```
|
||||||
|
ReadSingleArgument(SequenceReader, targetType):
|
||||||
|
Read INT32 argLength
|
||||||
|
if argLength == 0 → return null
|
||||||
|
if argLength == 1 && first byte == 0 → return null (null marker)
|
||||||
|
|
||||||
|
argSlice = UnreadSequence.Slice(0, argLength) — zero-copy reference
|
||||||
|
Advance(argLength)
|
||||||
|
|
||||||
|
1. byte[] fast-path:
|
||||||
|
if first byte == BinaryTypeCode.ByteArray (0x44):
|
||||||
|
skip tag + VarUInt length → return payload as byte[]
|
||||||
|
Detection is wire-format only — 0x44 is unambiguous (no AcBinary object starts with it)
|
||||||
|
|
||||||
|
2. IsRawBytesData path:
|
||||||
|
if _currentSignalParams.IsRawBytesData == true:
|
||||||
|
return SequenceToByteArray(argSlice) — entire arg as raw byte[], no deserialization
|
||||||
|
Consumer (DataSource.PopulateMerge) handles deserialization
|
||||||
|
|
||||||
|
3. Typed deserialization:
|
||||||
|
if targetType == object && SignalDataType != null:
|
||||||
|
resolve Type from SignalDataType (AssemblyQualifiedName)
|
||||||
|
DeserializeFromSequence(argSlice, resolvedType, options)
|
||||||
|
→ AcBinaryDeserializer.Deserialize(ReadOnlySequence, Type)
|
||||||
|
→ single-segment: ArrayBinaryInput (zero-copy via TryGetArray)
|
||||||
|
→ multi-segment: SequenceBinaryInput (lazy iteration, no pre-allocation)
|
||||||
|
```
|
||||||
|
|
||||||
|
### SignalParams Capture
|
||||||
|
|
||||||
|
`_currentSignalParams` field captures the parsed `SignalParams` (arg[2]) during `ReadArguments`. The 4th arg (data) uses it for type-aware deserialization. Thread-safe: SignalR processes messages sequentially per connection.
|
||||||
|
|
||||||
|
### SequenceToByteArray
|
||||||
|
|
||||||
|
Zero-copy when possible: if single-segment and backing array matches exactly → return the array directly. Otherwise `ReadOnlySequence.ToArray()`.
|
||||||
|
|
||||||
|
### SequenceBinaryInput (Multi-Segment Deserialization)
|
||||||
|
|
||||||
|
`struct SequenceBinaryInput : IBinaryInputBase` — reads from `ReadOnlySequence<byte>` without linearizing. Lazy iteration via `ReadOnlySequence.TryGet` — zero constructor allocation, no pre-extracted segment array.
|
||||||
|
|
||||||
|
Cross-boundary reads (e.g. 4-byte int split across 2 segments) use a small scratch buffer (32 bytes). Remainder tracking via `_remainderArray/Offset/Count` — no segment array mutation.
|
||||||
|
|
||||||
## Config
|
## Config
|
||||||
|
|
||||||
|
|
@ -105,4 +157,4 @@ Write side: `WriteArgument` handles both `byte[]` and `SignalData` via the same
|
||||||
| `Options` | `AcBinarySerializerOptions.Default` | Serializer options (volatile, runtime-replaceable) |
|
| `Options` | `AcBinarySerializerOptions.Default` | Serializer options (volatile, runtime-replaceable) |
|
||||||
| `BufferWriterChunkSize` | 65536 | Chunk size for both BWOs |
|
| `BufferWriterChunkSize` | 65536 | Chunk size for both BWOs |
|
||||||
|
|
||||||
**Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (derived, ArrayPool)
|
**Source:** `AyCode.Services/SignalRs/AcBinaryHubProtocol.cs` (base), `AyCode.Services/SignalRs/AyCodeBinaryHubProtocol.cs` (derived)
|
||||||
|
|
|
||||||
File diff suppressed because one or more lines are too long
|
|
@ -26,10 +26,10 @@
|
||||||
|
|
||||||
See `AyCode.Services/docs/SIGNALR.md` for full architecture documentation.
|
See `AyCode.Services/docs/SIGNALR.md` for full architecture documentation.
|
||||||
|
|
||||||
- **Single dispatch method** — all communication goes through `OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, SignalData data)`. Do not add new hub methods.
|
- **Single dispatch method** — all communication goes through `OnReceiveMessage(int messageTag, int? requestId, SignalParams signalParams, object data)`. Do not add new hub methods.
|
||||||
- **Tag-based routing** — associate methods with integer tags via `[SignalR(tag)]` (server) or `[SignalRSendToClient(tag)]` (client). Tags must be unique across the entire system.
|
- **Tag-based routing** — associate methods with integer tags via `[SignalR(tag)]` (server) or `[SignalRSendToClient(tag)]` (client). Tags must be unique across the entire system.
|
||||||
- **CRUD bundles** — entities use `SignalRCrudTags(getAllTag, getItemTag, addTag, updateTag, removeTag)` with 5 independent tag integers. Tags must be unique across the system. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`.
|
- **CRUD bundles** — entities use `SignalRCrudTags(getAllTag, getItemTag, addTag, updateTag, removeTag)` with 5 independent tag integers. Tags must be unique across the system. See `AyCode.Services.Server/docs/SIGNALR_DATASOURCE.md`.
|
||||||
- **Binary protocol** — `AyCodeBinaryHubProtocol` (derived from `AcBinaryHubProtocol`) is the transport protocol. Uses `ArrayPool`-backed `SignalData` for response payload. Responses use pure Binary serialization.
|
- **Binary protocol** — `AyCodeBinaryHubProtocol` (derived from `AcBinaryHubProtocol`) is the transport protocol. Zero-copy write: `AcBinarySerializer.Serialize(value, output)` directly to pipe. Zero-copy read: `SequenceReader<byte>` + type-aware deserialization via `SignalParams.SignalDataType`. Three read paths: byte[] fast-path (0x44 tag), IsRawBytesData (raw byte[]), typed deserialization.
|
||||||
|
|
||||||
### ⚠️ Temporary: JSON-in-Binary Request Parameters
|
### ⚠️ Temporary: JSON-in-Binary Request Parameters
|
||||||
|
|
||||||
|
|
|
||||||
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue