[LOADED_DOCS: 2 files, no new loads]

Remove PipeReader APIs from AcBinaryDeserializer

Refactored to remove all PipeReader-based async deserialization methods from AcBinaryDeserializer. Updated BINARY_TODO.md to clarify that draining PipeReader to AsyncPipeReaderInput is now a consumer responsibility. Refactored AcBinaryInputFormatter to inline the drain-loop and background deserialization, following new layering guidance. Updated comments and docs to reflect these changes.
This commit is contained in:
Loretta 2026-05-04 14:42:17 +02:00
parent e139eca389
commit 7d9cf10a6e
3 changed files with 26 additions and 60 deletions

View File

@ -5,7 +5,6 @@ using System.Collections.Concurrent;
using System.Collections.Frozen;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
@ -375,57 +374,6 @@ public static partial class AcBinaryDeserializer
}
}
/// <summary>
/// Drains a <see cref="PipeReader"/> end-to-end into a fresh <see cref="AsyncPipeReaderInput"/>
/// and deserializes one message. Background <c>Task.Run</c> deserializes incrementally while
/// the calling thread drains the reader. For long-lived multi-message scenarios use the
/// <see cref="AsyncPipeReaderInput"/> overloads directly.
/// </summary>
public static async Task<T?> DeserializeFromPipeReaderAsync<[DynamicallyAccessedMembers(TypeMetadataBase.RequiredMembers)] T>(PipeReader reader, AcBinarySerializerOptions options, CancellationToken cancellationToken = default)
{
if (reader is null) throw new ArgumentNullException(nameof(reader));
using var input = new AsyncPipeReaderInput(options.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => Deserialize<T>(input, options), cancellationToken);
await DrainPipeReaderToInputAsync(reader, input, cancellationToken).ConfigureAwait(false);
return await deserTask.ConfigureAwait(false);
}
/// <summary>
/// Non-generic <c>Type</c>-based counterpart to <see cref="DeserializeFromPipeReaderAsync{T}"/>.
/// For runtime-typed scenarios (MVC formatters, plugin frameworks).
/// </summary>
public static async Task<object?> DeserializeFromPipeReaderAsync(PipeReader reader, [DynamicallyAccessedMembers(TypeMetadataBase.RequiredMembers)] Type targetType, AcBinarySerializerOptions options, CancellationToken cancellationToken = default)
{
if (reader is null) throw new ArgumentNullException(nameof(reader));
using var input = new AsyncPipeReaderInput(options.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => Deserialize(input, targetType, options), cancellationToken);
await DrainPipeReaderToInputAsync(reader, input, cancellationToken).ConfigureAwait(false);
return await deserTask.ConfigureAwait(false);
}
/// <summary>
/// Pumps a <see cref="PipeReader"/> into a <see cref="AsyncPipeReaderInput"/> via repeated
/// <see cref="AsyncPipeReaderInput.Feed"/> calls; signals <see cref="AsyncPipeReaderInput.Complete"/>
/// at end-of-stream (in finally so consumer always wakes up on cancellation / exception).
/// </summary>
private static async Task DrainPipeReaderToInputAsync(PipeReader reader, AsyncPipeReaderInput input, CancellationToken cancellationToken)
{
try
{
while (true)
{
var result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
foreach (var segment in result.Buffer) input.Feed(segment.Span);
reader.AdvanceTo(result.Buffer.End);
if (result.IsCompleted) break;
}
}
finally
{
input.Complete();
}
}
/// <summary>
/// Internal: Deserialize with any TInput (multi-segment or other future input types).
/// </summary>

View File

@ -371,13 +371,11 @@ Added in `AcBinarySerializer.cs`:
- `SerializeChunked(object?, Type, PipeWriter, opts)``int`
- `SerializeChunkedFramed(object?, Type, PipeWriter, opts)``int`
Added in `AcBinaryDeserializer.cs`:
- `DeserializeFromPipeReaderAsync<T>(PipeReader, opts, ct)``Task<T?>`
- `DeserializeFromPipeReaderAsync(PipeReader, Type, opts, ct)``Task<object?>`
`AcBinaryDeserializer.cs` already had `Deserialize(byte[], Type, opts)` / `Deserialize(ReadOnlySequence<byte>, Type, opts)` / `Deserialize(AsyncPipeReaderInput, Type, opts)` overloads — no new entries needed.
The `Deserialize(byte[], Type, opts)` / `Deserialize(ReadOnlySequence<byte>, Type, opts)` / `Deserialize(AsyncPipeReaderInput, Type, opts)` overloads already existed.
**Layering note**: `PipeReader → AsyncPipeReaderInput` drain-loop is the consumer's responsibility, not the binary serializer's. The serializer surface ends at `AsyncPipeReaderInput`; transport-specific draining (PipeReader, NamedPipe, SignalR `state.Buffer.Write`, etc.) lives in the consumer layer (e.g. `AcBinaryInputFormatter`, `AcBinaryHubProtocol.TryParseChunkData`).
Consumed by ASP.NET Core MVC formatter package (`AyCode.Services/Mvc/`) — `AcBinaryInputFormatter`, `AcBinaryOutputFormatter`, `AddAcBinaryFormatters` extension. Media type: `application/vnd.acbinary`.
Consumed by ASP.NET Core MVC formatter package (`AyCode.Services/Mvc/`) — `AcBinaryInputFormatter`, `AcBinaryOutputFormatter`, `AddAcBinaryFormatters` extension. Media type: `application/vnd.acbinary`. Drain-loop inlined in `AcBinaryInputFormatter.ReadRequestBodyAsync`.
Plugin frameworks, ASP.NET ModelBinding, DI middleware, and DataContractSerializer-style "generic-API container" use-cases need to serialize an `object` whose type is known only at runtime. Current AcBinary surface forces a reflection trampoline through the generic `Serialize<T>`:

View File

@ -7,8 +7,9 @@ namespace AyCode.Services.Mvc;
/// <summary>
/// ASP.NET Core MVC InputFormatter for AcBinary wire format. Reads request body via PipeReader,
/// drains into AsyncPipeReaderInput, deserializes to ModelType. Standard ProblemDetails error
/// flow on failure (ModelState.AddModelError → 400 + application/problem+json).
/// drains chunks into AsyncPipeReaderInput on the calling thread while a background task
/// deserializes incrementally, then returns the result. Standard ProblemDetails error flow on
/// failure (ModelState.AddModelError → 400 + application/problem+json).
/// </summary>
public class AcBinaryInputFormatter : InputFormatter
{
@ -31,9 +32,28 @@ public class AcBinaryInputFormatter : InputFormatter
var ct = context.HttpContext.RequestAborted;
var reader = PipeReader.Create(context.HttpContext.Request.Body);
try
{
var model = await AcBinaryDeserializer.DeserializeFromPipeReaderAsync(reader, context.ModelType, _options, ct).ConfigureAwait(false);
using var input = new AsyncPipeReaderInput(_options.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, context.ModelType, _options), ct);
try
{
while (true)
{
var result = await reader.ReadAsync(ct).ConfigureAwait(false);
foreach (var segment in result.Buffer) input.Feed(segment.Span);
reader.AdvanceTo(result.Buffer.End);
if (result.IsCompleted) break;
}
}
finally
{
input.Complete();
}
var model = await deserTask.ConfigureAwait(false);
return await InputFormatterResult.SuccessAsync(model).ConfigureAwait(false);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)