diff --git a/AyCode.Core.Serializers.Console/Program.cs b/AyCode.Core.Serializers.Console/Program.cs
index 1f6ef6b..fe28876 100644
--- a/AyCode.Core.Serializers.Console/Program.cs
+++ b/AyCode.Core.Serializers.Console/Program.cs
@@ -1,4 +1,5 @@
using AyCode.Core.Compression;
+using AyCode.Core.Serializers.Attributes;
using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels;
using MemoryPack;
@@ -9,6 +10,7 @@ using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.IO.Pipes;
+using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
@@ -64,12 +66,59 @@ public static class Program
private static readonly UTF8Encoding Utf8NoBom = new(encoderShouldEmitUTF8Identifier: false);
+ ///
+ /// Aggregated feature flags across every type tagged with
+ /// the attribute in the loaded assemblies. Cached on first access (single reflection scan at startup).
+ /// Used by so the per-row Options column shows BOTH the
+ /// configured options-level value AND the effective attribute-level enable flag — a feature flagged
+ /// off at the type level overrides the options regardless of preset, and that asymmetry must surface
+ /// in the log to avoid misreading a "RefHandling=OnlyId" / "Interning=All" line as actually active.
+ /// Aggregation rule: if ALL tagged types have the feature enabled → true; if any tagged type
+ /// disables it → false (a single disabling type suppresses the feature on the type-graph).
+ ///
+ private static readonly (bool refHandling, bool internString, bool metadata, bool idTracking) _attrFlags
+ = ScanAcBinaryAttributeFlags();
+
+ private static (bool refHandling, bool internString, bool metadata, bool idTracking) ScanAcBinaryAttributeFlags()
+ {
+ var attrs = AppDomain.CurrentDomain.GetAssemblies()
+ .SelectMany(a => { try { return a.GetTypes(); } catch { return Array.Empty(); } })
+ .Select(t => t.GetCustomAttribute())
+ .Where(a => a != null)
+ .ToList();
+
+ if (attrs.Count == 0) return (false, false, false, false);
+
+ return (
+ refHandling: attrs.All(a => a!.EnableRefHandlingFeature),
+ internString: attrs.All(a => a!.EnableInternStringFeature),
+ metadata: attrs.All(a => a!.EnableMetadataFeature),
+ idTracking: attrs.All(a => a!.EnableIdTrackingFeature));
+ }
+
+ ///
+ /// Common Options-column formatter for every AcBinary serializer benchmark row. Renders the
+ /// configured options-level value AND the effective attribute-level enable flag side-by-side
+ /// (e.g. Interning=All(opt) | False (attr)) so attribute-suppressed features cannot
+ /// silently mislead. Pass any benchmark-specific extras (e.g. ", BufferSize=4096B")
+ /// in — they are appended after the common fields.
+ ///
+ private static string BuildAcBinaryOptionsDescription(AcBinarySerializerOptions options, string extra = "")
+ {
+ return $"WireMode={options.WireMode}, " +
+ $"RefHandling={options.ReferenceHandling}(opt) | {_attrFlags.refHandling} (attr), " +
+ $"Interning={options.UseStringInterning}(opt) | {_attrFlags.internString} (attr), " +
+ $"Metadata={options.UseMetadata}(opt) | {_attrFlags.metadata} (attr), " +
+ $"SGen={options.UseGeneratedCode}, " +
+ $"Compression={options.UseCompression}{extra}";
+ }
+
#if DEBUG
private const int WarmupIterations = 0;
private const int TestIterations = 1;
private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
- private static int WarmupIterations = 1000; //5000
+ private static int WarmupIterations = 5000; //5000
private static int TestIterations = 1000; //1000
private static int BenchmarkSamples = 3;
#endif
@@ -83,16 +132,21 @@ public static class Program
// Done early so user is told immediately, not after warmup.
ValidateMemoryPackSetup();
- // Determine layer (which test data to run) and opMode (ser/des/all).
+ // Determine layer (which test data to run), opMode (ser/des/all), and serializerMode (standard/asyncpipe).
// CLI args take precedence; if no args, show interactive menu.
+ // serializerMode: "standard" = all serializers EXCEPT AsyncPipe; "asyncpipe" = ONLY the AsyncPipe streaming benchmark.
+ // The two are mutually exclusive — AsyncPipe never runs alongside the standard set, so its long-lived pipe
+ // setup / kernel-buffer overhead does not skew the steady-state Byte[] / IBufferWriter measurements.
string layer;
var opMode = "all";
+ var serializerMode = "standard";
if (args.Length == 0)
{
var selection = ShowInteractiveMenu();
if (selection == null) return; // user pressed Q
- layer = selection;
+ layer = selection.Value.layer;
+ serializerMode = selection.Value.serializerMode;
}
else
{
@@ -117,6 +171,12 @@ public static class Program
{
layer = arg;
}
+ else if (arg is "asyncpipe" or "pipe")
+ {
+ // AsyncPipe-only mode: streaming I/O isolation across all test data.
+ layer = "all";
+ serializerMode = "asyncpipe";
+ }
else if (arg is "ser" or "serialize")
{
opMode = "serialize";
@@ -137,11 +197,12 @@ public static class Program
System.Console.WriteLine("╔══════════════════════════════════════════════════════════════════════╗");
System.Console.WriteLine("║ COMPREHENSIVE SERIALIZER BENCHMARK SUITE ║");
System.Console.WriteLine("╚══════════════════════════════════════════════════════════════════════╝");
+
var allResults = new List();
var allTestDataSets = BenchmarkTestDataProvider.CreateTestDataSets();
var testDataSets = FilterByLayer(allTestDataSets, layer);
- System.Console.WriteLine($"Layer: {layer} | OpMode: {opMode} | Iterations: {TestIterations} | Warmup: {WarmupIterations} | Samples: {BenchmarkSamples} (median)");
+ System.Console.WriteLine($"Layer: {layer} | OpMode: {opMode} | SerializerMode: {serializerMode} | Iterations: {TestIterations} | Warmup: {WarmupIterations} | Samples: {BenchmarkSamples} (median)");
System.Console.WriteLine($"Build: {BuildConfiguration} | .NET: {Environment.Version} | Test Type: {testDataSets.FirstOrDefault()?.TypeName ?? "unknown"} | Test Cells: {testDataSets.Count}/{allTestDataSets.Count}");
System.Console.WriteLine();
@@ -157,7 +218,7 @@ public static class Program
System.Console.WriteLine($"Global JIT pre-warmup ({testDataSets.Count} cells × all serializers, light pass)...");
foreach (var testData in testDataSets)
{
- var preSerializers = CreateSerializers(testData);
+ var preSerializers = CreateSerializers(testData, serializerMode);
try
{
foreach (var s in preSerializers)
@@ -184,7 +245,7 @@ public static class Program
System.Console.WriteLine($"TEST DATA: {testData.DisplayName}");
System.Console.WriteLine($"{'═'.ToString().PadRight(70, '═')}");
- var results = RunBenchmarksForTestData(testData, opMode);
+ var results = RunBenchmarksForTestData(testData, opMode, serializerMode);
allResults.AddRange(results);
}
@@ -215,6 +276,7 @@ public static class Program
options.UseStringInterning = StringInterningMode.None;
var bytes = AcBinarySerializer.Serialize(order, options);
+
// Warmup (fills caches)
System.Console.WriteLine("Warming up (1000 iterations)...");
for (var i = 0; i < 1000; i++)
@@ -247,16 +309,17 @@ public static class Program
System.Console.WriteLine(">>> ATTACH MEMORY PROFILER NOW <<<");
System.Console.WriteLine("Press any key to exit...");
System.Console.ReadKey(intercept: true);
+
System.Console.WriteLine();
System.Console.WriteLine("✓ Profiler mode complete. Exiting now.");
}
#region Benchmark Execution
- private static List RunBenchmarksForTestData(TestDataSet testData, string mode)
+ private static List RunBenchmarksForTestData(TestDataSet testData, string mode, string serializerMode)
{
var results = new List();
- var serializers = CreateSerializers(testData);
+ var serializers = CreateSerializers(testData, serializerMode);
// Round-trip correctness check — once per (cell × serializer), BEFORE warmup. Aborts the entire benchmark on failure.
System.Console.WriteLine("Verifying round-trip correctness...");
@@ -322,11 +385,13 @@ public static class Program
// Dedicated alloc-only sample (separate from timing samples; keeps timing pure)
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations);
}
+
if (mode is "all" or "deserialize" or "des")
{
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations);
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations);
}
+
// Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable).
result.RoundTripTimeMs = result.SerializeTimeMs + result.DeserializeTimeMs;
result.RoundTripAllocBytesPerOp = result.SerializeAllocBytesPerOp + result.DeserializeAllocBytesPerOp;
@@ -343,8 +408,32 @@ public static class Program
return results;
}
- private static List CreateSerializers(TestDataSet testData)
+ private static List CreateSerializers(TestDataSet testData, string serializerMode)
{
+ // AsyncPipe-only mode — return ONLY the AsyncPipe streaming benchmark (no other serializer).
+ // Streaming I/O has long-lived pipe setup + kernel-buffer overhead that, when interleaved with
+ // the standard byte-array / IBufferWriter measurements, masks the steady-state numbers. Run it
+ // in isolation so the timing numbers reflect ONLY the streaming path.
+ if (serializerMode == "asyncpipe")
+ {
+ // NamedPipe — pipe-aligned chunk size for the long-lived IPC scenario. The chunkSize here
+ // drives the AsyncPipeWriterOutput's chunk-on-wire size (header + data, page-aligned thanks to
+ // the AcquireChunk fix) AND the kernel pipe buffer size (inBufferSize/outBufferSize on the
+ // NamedPipeServerStream ctor). Same value across both layers = one WriteFile(chunkSize) syscall
+ // fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level
+ // wire chunk AND kernel transfer unit; change ONLY this line when tuning.
+ var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode;
+ binaryFastModePipeChunkOnly.BufferWriterChunkSize = 16_384; //AsyncPipeWriterOutput.MaxChunkSize;
+
+ return new List
+ {
+ new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeChunk)"),
+ };
+ }
+
+ // Standard mode — all serializers EXCEPT AsyncPipe (the streaming benchmark is opt-in via the
+ // AsyncPipe menu / CLI mode, never bundled with the steady-state suite).
+
var binaryNoInternOption = AcBinarySerializerOptions.Default;
binaryNoInternOption.UseStringInterning = StringInterningMode.None;
@@ -354,20 +443,17 @@ public static class Program
var binaryFastModeNoSgenOption = AcBinarySerializerOptions.FastMode;
binaryFastModeNoSgenOption.UseGeneratedCode = false;
- // Pipe-aligned max chunk size for the IBufferWriter / NamedPipe variants — matches
- // AsyncPipeWriterOutput.MaxChunkSize (UINT16 max = 65535), the largest payload that fits in one
- // [201][UINT16][data] wire frame. The same value also drives the kernel pipe buffer in the
- // NamedPipeServerStream ctor (inBufferSize/outBufferSize) so the app-level chunk and the
- // kernel-level transfer unit stay in sync — one WriteFile(chunkSize) syscall fits blocking-free in
- // one kernel pipe-buffer slot, eliminating the page-segmentation in-syscall stall that plagued
- // the previous 4 KB profile (where a 65 KB user-space chunk would still get sliced 16× inside
- // the kernel because the default kernel pipe buffer is page-sized).
- // Centralised here so ALL pipe-style benchmarks (BufWr new, NamedPipe) share a single source of
- // truth — change ONLY THIS line when tuning the pipe chunk size, never inside individual benchmark
- // ctors. Earlier 4 KB-baseline measurements remain comparable via the archived .LLM logs in
- // Test_Benchmark_Results/Benchmark/.
- var binaryFastModePipeChunk = AcBinarySerializerOptions.FastMode;
- //AsyncPipeWriterOutput.MaxChunkSize;
+ // BufWr new — 4 KB chunk size for the FRESH ArrayBufferWriter scenario. The chunkSize here drives
+ // the serializer's GetSpan(N) request → the ArrayBufferWriter's internal allocation per call.
+ // Small chunk = small per-call allocation, optimum for one-shot serialization where each iteration
+ // allocates a fresh ABW. Independent of the AsyncPipe profile (different mechanism: alloc overhead
+ // vs syscall count).
+ var binaryFastModeBufWrChunk = AcBinarySerializerOptions.FastMode;
+ binaryFastModeBufWrChunk.BufferWriterChunkSize = 4096;
+
+ var defaultOptions = AcBinarySerializerOptions.Default;
+ defaultOptions.UseStringInterning = StringInterningMode.None;
+ defaultOptions.ReferenceHandling = ReferenceHandlingMode.OnlyId;
return new List
{
@@ -379,7 +465,11 @@ public static class Program
// Fastest Byte[] — Runtime path (UseGeneratedCode=false). Same wire/options, no source-generated dispatch.
// Always paired with the SGen variant so every layer can compare the SGen speed-up apples-to-apples.
new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"),
- //new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.Default, "Default"),
+ // Default preset Byte[] — RefHandling=OnlyId (deduplicates IId-shared references on the wire) +
+ // UseStringInterning=All (deduplicates repeated strings). Showcases the Default preset's wire-size
+ // and CPU trade-off vs FastMode on the ~20% IId-ref / repeated-string test data.
+
+ new AcBinaryBenchmark(testData.Order, defaultOptions, "Default"),
//new AcBinaryBenchmark(testData.Order, binaryDefaultNoSgenOption, "Default"),
//new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.WithoutReferenceHandling, "NoRef"),
//new AcBinaryBenchmark(testData.Order, binaryNoInternOption, "NoIntern"),
@@ -388,17 +478,12 @@ public static class Program
new AcBinaryBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"),
// AcBinary via IBufferWriter (FRESH ArrayBufferWriter per call — one-shot scenario).
- // PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method).
- new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
+ // 4 KB chunk size from binaryFastModeBufWrChunk — minimises the per-call ArrayBufferWriter
+ // allocation. Optimum for this scenario.
+ new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModeBufWrChunk, "FastMode (4KB BufWr)"),
- // AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration.
- // PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method) —
- // same value drives BOTH the app-level wire chunk AND the kernel pipe buffer (inBufferSize/outBufferSize
- // in the NamedPipeServerStream ctor). Persistent connection + multi-message wire framing + max-size
- // chunks aligned with the kernel transfer unit. Single-process loopback, so the number is a lower bound
- // (real cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown
- // in RT ms, Ser/Des = N/A (IsRoundTripOnly).
- new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
+ // AsyncPipe streaming (AcBinaryNamedPipeBenchmark) is intentionally OMITTED here — run it via
+ // the dedicated AsyncPipe menu / CLI mode for isolated streaming-I/O measurements.
// ============================================================
// MemoryPack — three I/O modes for apples-to-apples comparison
@@ -431,6 +516,7 @@ public static class Program
// Single-sample fast path (Debug or trivial run) — no allocation, no sort.
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
+
sw.Stop();
return sw.Elapsed.TotalMilliseconds;
}
@@ -440,14 +526,14 @@ public static class Program
{
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
+
sw.Stop();
times[s] = sw.Elapsed.TotalMilliseconds;
}
Array.Sort(times);
+
// Median: middle value for odd sample counts, average of two middles for even counts.
- return samples % 2 == 1
- ? times[samples / 2]
- : (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
+ return samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
}
///
@@ -458,8 +544,10 @@ public static class Program
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
+
var before = GC.GetAllocatedBytesForCurrentThread();
for (var i = 0; i < iterations; i++) action();
+
var after = GC.GetAllocatedBytesForCurrentThread();
return (after - before) / iterations;
}
@@ -477,8 +565,10 @@ public static class Program
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
+
var before = GC.GetTotalAllocatedBytes(precise: true);
for (var i = 0; i < iterations; i++) action();
+
var after = GC.GetTotalAllocatedBytes(precise: true);
return (after - before) / iterations;
}
@@ -486,6 +576,7 @@ public static class Program
private static readonly JsonSerializerOptions VerifyJsonOpts = new()
{
WriteIndented = false,
+
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
ReferenceHandler = System.Text.Json.Serialization.ReferenceHandler.IgnoreCycles
};
@@ -498,8 +589,10 @@ public static class Program
{
if (a == null && b == null) return true;
if (a == null || b == null) return false;
+
var jsonA = JsonSerializer.Serialize(a, VerifyJsonOpts);
var jsonB = JsonSerializer.Serialize(b, VerifyJsonOpts);
+
return jsonA == jsonB;
}
@@ -510,6 +603,7 @@ public static class Program
private static void ValidateMemoryPackSetup()
{
var typesToCheck = new[] { typeof(TestOrder) };
+
foreach (var type in typesToCheck)
{
var hasAttr = type.GetCustomAttributes(typeof(MemoryPackableAttribute), inherit: true).Any();
@@ -517,6 +611,7 @@ public static class Program
{
System.Console.Error.WriteLine($"❌ FATAL: {type.FullName} is not [MemoryPackable] — MemoryPack would fall back to runtime resolver, comparison is INVALID for SGen-vs-SGen claim.");
System.Console.Error.WriteLine("Add [MemoryPackable] to the type and any nested types referenced from it.");
+
Environment.Exit(1);
}
}
@@ -525,7 +620,7 @@ public static class Program
///
/// Interactive menu shown when no CLI args. Returns the layer keyword (core/comprehensive/edge/all) or null on Quit.
///
- private static string? ShowInteractiveMenu()
+ private static (string layer, string serializerMode)? ShowInteractiveMenu()
{
System.Console.WriteLine();
System.Console.WriteLine("╔══════════════════════════════════════════════════════════╗");
@@ -538,18 +633,22 @@ public static class Program
System.Console.WriteLine(" [2] Comprehensive — release validation");
System.Console.WriteLine(" [3] Edge cases — refactor verification");
System.Console.WriteLine(" [A] All layers");
+ System.Console.WriteLine(" [P] AsyncPipe — streaming I/O isolation (only AsyncPipe, all test data)");
System.Console.WriteLine(" [Q] Quit");
System.Console.Write("\nSelection: ");
+
var key = System.Console.ReadKey(intercept: false).KeyChar;
System.Console.WriteLine();
+
return char.ToLower(key) switch
{
- '1' => "core",
- '2' => "comprehensive",
- '3' => "edge",
- 'a' => "all",
+ '1' => ("core", "standard"),
+ '2' => ("comprehensive", "standard"),
+ '3' => ("edge", "standard"),
+ 'a' => ("all", "standard"),
+ 'p' => ("all", "asyncpipe"),
'q' => null,
- _ => "core"
+ _ => ("all", "standard")
};
}
@@ -567,8 +666,6 @@ public static class Program
// P3 will add: "ColdStart", "VeryLarge", "PathologicalString", etc.
var edgeExtras = new string[] { /* P3 */ };
- bool StartsWithAny(string name, string[] prefixes) => prefixes.Any(p => name.StartsWith(p));
-
return layer switch
{
"core" => all.Where(t => StartsWithAny(t.Name, coreNames)).ToList(),
@@ -576,6 +673,8 @@ public static class Program
"edge" => all.Where(t => StartsWithAny(t.Name, coreNames) || StartsWithAny(t.Name, comprehensiveExtras) || StartsWithAny(t.Name, edgeExtras)).ToList(),
_ => all.ToList()
};
+
+ static bool StartsWithAny(string name, string[] prefixes) => prefixes.Any(name.StartsWith);
}
#endregion
@@ -622,7 +721,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}";
+ public string OptionsDescription => BuildAcBinaryOptionsDescription(_options);
public AcBinaryBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@@ -783,7 +882,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B";
+ public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B");
public AcBinaryFreshBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@@ -881,7 +980,7 @@ public static class Program
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public bool IsRoundTripOnly => true;
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)";
+ public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)");
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@@ -1047,7 +1146,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes { get; }
- public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}";
+ public string OptionsDescription => BuildAcBinaryOptionsDescription(_options);
public AcBinaryBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
diff --git a/AyCode.Core.Tests/TestModels/BenchmarkTestDataProvider.cs b/AyCode.Core.Tests/TestModels/BenchmarkTestDataProvider.cs
index 98c0a72..0085429 100644
--- a/AyCode.Core.Tests/TestModels/BenchmarkTestDataProvider.cs
+++ b/AyCode.Core.Tests/TestModels/BenchmarkTestDataProvider.cs
@@ -47,7 +47,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
- return new TestDataSet("Small (2x2x2x2)", order, iidRefPercent: 10);
+ return new TestDataSet("Small (2x2x2x2)", order, iidRefPercent: 20);
}
private static TestDataSet CreateMediumTestData(bool resetId = true)
@@ -79,7 +79,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
- return new TestDataSet("Medium (3x3x3x4)", order, iidRefPercent: 10);
+ return new TestDataSet("Medium (3x3x3x4)", order, iidRefPercent: 20);
}
private static TestDataSet CreateLargeTestData(bool resetId = true)
@@ -109,7 +109,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
- return new TestDataSet("Large (5x5x5x10)", order, iidRefPercent: 10);
+ return new TestDataSet("Large (5x5x5x10)", order, iidRefPercent: 20);
}
private static TestDataSet CreateRepeatedStringsTestData(bool resetId = true)
@@ -137,15 +137,23 @@ public static class BenchmarkTestDataProvider
sharedUser: sharedUser,
sharedPreferences: sharedPreferences);
+ // Repeated string fields — ProductName on items + PalletCode on pallets. Both are common
+ // across the hierarchy, exercising string-interning deduplication on the Default preset
+ // (which has UseStringInterning = All). Targeting ~20% repeated-string share overall.
foreach (var item in order.Items)
{
item.Status = TestStatus.Processing;
item.ProductName = "CommonProductName_RepeatedForTesting";
+
+ foreach (var pallet in item.Pallets)
+ {
+ pallet.PalletCode = "CommonPalletCode_RepeatedForTesting";
+ }
}
ClearDeepLevelRefs(order);
- return new TestDataSet("Repeated Strings (10 items)", order, iidRefPercent: 10);
+ return new TestDataSet("Repeated Strings (10 items)", order, iidRefPercent: 20);
}
private static TestDataSet CreateDeepNestedTestData(bool resetId = true)
@@ -177,17 +185,21 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
- return new TestDataSet("Deep Nested (2x4x4x8)", order, iidRefPercent: 10);
+ return new TestDataSet("Deep Nested (2x4x4x8)", order, iidRefPercent: 20);
}
private static void ClearDeepLevelRefs(TestOrder order)
{
+ // Keep shared IId refs at the pallet level (Tag + Inspector) — these contribute the bulk of
+ // the ~20% IId-ref share that the test data targets. Only Category is cleared at this level
+ // (one-of-three clears keep the share moderate). The deeper measurement / point levels are
+ // cleared entirely so deep-tree ref noise does not skew the share upward beyond ~20%.
foreach (var item in order.Items)
{
foreach (var pallet in item.Pallets)
{
- pallet.Tag = null;
- pallet.Inspector = null;
+ // pallet.Tag = null; // KEEP for ~20% IId-ref share (was cleared)
+ // pallet.Inspector = null; // KEEP for ~20% IId-ref share (was cleared)
pallet.Category = null;
foreach (var measurement in pallet.Measurements)
diff --git a/AyCode.Core.Tests/TestModels/SharedTestModels.cs b/AyCode.Core.Tests/TestModels/SharedTestModels.cs
index 78dd3e7..01a94fc 100644
--- a/AyCode.Core.Tests/TestModels/SharedTestModels.cs
+++ b/AyCode.Core.Tests/TestModels/SharedTestModels.cs
@@ -55,7 +55,7 @@ public enum TestUserRole
/// Implements IId<int> for semantic $id/$ref serialization.
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedTag : IId
{
@@ -80,7 +80,7 @@ public partial class SharedTag : IId
/// Shared category - for hierarchical cross-reference testing.
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedCategory : IId
{
@@ -106,7 +106,7 @@ public partial class SharedCategory : IId
/// Shared user reference - appears in many places to test $ref deduplication.
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedUser : IId
{
@@ -136,7 +136,7 @@ public partial class SharedUser : IId
/// User preferences - non-IId nested object
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class UserPreferences
{
@@ -162,7 +162,7 @@ public partial class UserPreferences
/// Does NOT implement IId, so uses standard Newtonsoft reference tracking.
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class MetadataInfo
{
@@ -190,7 +190,7 @@ public partial class MetadataInfo
/// Level 1: Main order - root of the hierarchy
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestOrder : IId
{
@@ -249,7 +249,7 @@ public partial class TestOrder : IId
///
/// Level 1: Main order - root of the hierarchy
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public partial class TestOrder_Circ_Ref : IId
{
public int Id { get; set; }
@@ -284,7 +284,7 @@ public partial class TestOrder_Circ_Ref : IId
/// Level 2: Order item with pallets
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestOrderItem : IId
{
@@ -323,7 +323,7 @@ public partial class TestOrderItem : IId
///
/// Level 2: Order item with pallets
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public partial class TestOrderItem_Circ_Ref : IId
{
public int Id { get; set; }
@@ -347,7 +347,7 @@ public partial class TestOrderItem_Circ_Ref : IId
/// Level 3: Pallet containing measurements
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestPallet : IId
{
@@ -390,7 +390,7 @@ public partial class TestPallet : IId
/// Level 4: Measurement with multiple points
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestMeasurement : IId
{
@@ -425,7 +425,7 @@ public partial class TestMeasurement : IId
/// Level 5: Deepest level - measurement point
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestMeasurementPoint : IId
{
@@ -459,7 +459,7 @@ public partial class TestMeasurementPoint : IId
///
/// Order with Guid Id - for testing Guid-based IId
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class TestGuidOrder : IId
{
public Guid Id { get; set; }
@@ -471,7 +471,7 @@ public class TestGuidOrder : IId
///
/// Item with Guid Id
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class TestGuidItem : IId
{
public Guid Id { get; set; }
@@ -487,7 +487,7 @@ public class TestGuidItem : IId
/// Simulates NopCommerce GenericAttribute - stores key-value pairs where DateTime values
/// are stored as strings in the database.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class TestGenericAttribute
{
public int Id { get; set; }
@@ -499,7 +499,7 @@ public class TestGenericAttribute
/// DTO with GenericAttributes collection - simulates OrderDto with string-stored DateTime values.
/// This reproduces the production bug where Binary serialization was thought to corrupt DateTime strings.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class TestDtoWithGenericAttributes : IId
{
public int Id { get; set; }
@@ -510,7 +510,7 @@ public class TestDtoWithGenericAttributes : IId
///
/// Order with nullable collections for null vs empty testing
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class TestOrderWithNullableCollections
{
public int Id { get; set; }
@@ -523,7 +523,7 @@ public class TestOrderWithNullableCollections
/// Class with all primitive types for WASM/serialization testing
///
[MemoryPackable]
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public partial class PrimitiveTestClass
{
public int IntValue { get; set; }
@@ -546,7 +546,7 @@ public partial class PrimitiveTestClass
/// Class with extended primitive types for full serializer coverage.
/// Includes DateTimeOffset, TimeSpan, Dictionary, null properties.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ExtendedPrimitiveTestClass
{
public int Id { get; set; }
@@ -576,7 +576,7 @@ public class ExtendedPrimitiveTestClass
///
/// Class with array of objects containing null items for WriteNull coverage
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ObjectWithNullItems
{
public int Id { get; set; }
@@ -591,7 +591,7 @@ public class ObjectWithNullItems
/// "Server-side" DTO with extra properties that the "client" doesn't know about.
/// Used to test SkipValue functionality when deserializing unknown properties.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ServerCustomerDto : IId
{
public int Id { get; set; }
@@ -624,7 +624,7 @@ public class ServerCustomerDto : IId
/// the deserializer must skip unknown properties correctly
/// while still maintaining string intern table consistency.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ClientCustomerDto : IId
{
public int Id { get; set; }
@@ -638,7 +638,7 @@ public class ClientCustomerDto : IId
/// Server DTO with nested objects that client doesn't know about.
/// Tests skipping complex nested structures.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ServerOrderWithExtras : IId
{
public int Id { get; set; }
@@ -659,7 +659,7 @@ public class ServerOrderWithExtras : IId
///
/// Client version of the order - doesn't have Customer/RelatedCustomers properties.
///
-[AcBinarySerializable(false)]
+[AcBinarySerializable(false, true, false, false)]
public class ClientOrderSimple : IId
{
public int Id { get; set; }
diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
index 52d1fd7..900476a 100644
--- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
+++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs
@@ -301,6 +301,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
ArrayPool.Shared.Return(_ownedBuffer);
_ownedBuffer = null;
}
+
_hasOwnedBuffer = false;
}
diff --git a/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md
new file mode 100644
index 0000000..43f42e5
--- /dev/null
+++ b/AyCode.Core/docs/BINARY/BINARY_ASYNCPIPE_ISSUES.md
@@ -0,0 +1,161 @@
+# AsyncPipe — Known Issues & Limitations
+
+Streaming I/O layer for the binary serializer (`AsyncPipeReaderInput` + `AsyncPipeWriterOutput`):
+sliding-window buffer, multi-message wire framing (`[201][UINT16][data]...[202]`), producer-consumer
+synchronization. Issues on this page cover the **streaming protocol**, NOT the binary serializer core
+(see [`BINARY_ISSUES.md`](BINARY_ISSUES.md) for the core). Planned / actionable work tracked in
+[`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md).
+
+## Streaming protocol — open issues
+
+### ACCORE-BIN-I-Q4T8: AsyncPipeReaderInput multi-message reuse not supported
+
+**Status:** Open
+**Affects:** `AsyncPipeReaderInput` (both raw mode `multiMessage: false` and framed mode)
+**Reach:** any long-lived pipe transport (NamedPipe, FileStream, NetworkStream) that needs to stream multiple AcBinary messages over a single connection without setting up a fresh input per message.
+
+**Symptom:** Calling `AcBinaryDeserializer.Deserialize(input, opts)` repeatedly on the same long-lived `AsyncPipeReaderInput` instance silently corrupts data on the second and subsequent calls. In raw mode the second call re-reads the FIRST message's bytes from buffer position 0; in framed mode the `[202]` CHUNK_END marker leaves `_completed = true` permanently, breaking any further reads.
+
+**Root cause:**
+
+1. `AsyncPipeReaderInput.Initialize(out buffer, out position, out bufferLength)` always emits `position = 0` — no "consumed offset" / "next-message-start" cursor is preserved between calls.
+
+2. The sliding-window reset-to-0 in `AppendToBuffer` (`if (rp > 0 && rp == _writePos) → reset both to 0`) only fires when the consumer has caught up via `TryAdvanceSegment`. For small messages that fit entirely inside the initial buffer capacity (4-8 KB), `TryAdvanceSegment` is **never** called during deserialization — so `_readPos` stays at 0, the reset never fires, and a second `Deserialize(input)` call starts reading the same buffer from offset 0 again (= duplicate of message #1).
+
+3. `Complete()` (called explicitly OR implicitly via the `[202]` CHUNK_END marker in framed mode) sets `_completed = true` irreversibly. There is no Reset / Reopen method.
+
+**Workarounds:**
+
+- **Per-message fresh `AsyncPipeReaderInput` instance** (the canonical pattern, used internally by `DeserializeFromPipeReaderAsync`): `using var input = new AsyncPipeReaderInput(...);` + drain Task + deserialization Task per message. Cost: one ArrayPool rent + one `ManualResetEventSlim` allocation per message + two `Task.Run` per message. Acceptable for typical IPC consumers but precludes any zero-alloc-per-message streaming on long-lived raw transports.
+
+- **Multiplexed wire format** (`SerializeChunkedFramed` + `AsyncPipeReaderInput(multiMessage: true)`): a parser layer above parses `[201][UINT16][data]` frames and feeds individual messages into per-message inputs. The `[202]`-end-of-message rule still applies, so each message logically needs its own input — but the parser can keep the wire-level reader long-lived. Implies pipeline parallelism (multiple in-flight messages), which may not match every consumer's needs.
+
+**Cross-references:**
+- ADR `docs/adr/0003-acbinary-streaming-receive-architecture.md` — origin of the streaming-receive architecture
+- `AcBinaryDeserializer.DeserializeFromPipeReaderAsync` — reference single-message usage pattern
+- `AsyncPipeReaderInput.Initialize` / `TryAdvanceSegment` / `AppendToBuffer` — the three sites that together enforce single-use semantics
+- Tracked fix: [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-r5k2`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-r5k2-multi-message-reuse-for-asyncpipereaderinput)
+
+### ACCORE-BIN-I-W6K4: Disposal trap — `IDisposable.Dispose()` does not guarantee consumer wakeup
+
+**Status:** Open
+**Affects:** `AsyncPipeReaderInput.Dispose()` vs consumer-thread wait-primitive
+
+If the stream lifecycle is torn down without an explicit "session-end" signal first, a consumer thread blocked in the input's wait-mechanism (`TryAdvanceSegment`'s `ManualResetEventSlim.Wait`) may never wake up. The disposed wait-primitive then surfaces as `ObjectDisposedException` on the consumer thread.
+
+**NuGet API surface concern:** the natural .NET pattern `using var input = new AsyncPipeReaderInput(...)` does not by itself protect the consumer — calling `Complete()` before disposal is required by current contract but not enforced or documented at the API surface. Consumers expecting `Dispose()` to be a well-behaved teardown (the .NET convention) silently hit the trap.
+
+**Possible directions** tracked in [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m7b3`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-m7b3-implicit-session-end-signal-in-dispose).
+
+### ACCORE-BIN-I-D8L5: Session-end is irreversible — no reset/reopen primitive
+
+**Status:** Open
+**Affects:** `AsyncPipeReaderInput.Complete()`
+
+The session-end signal flips an internal flag (`_completed`) that cannot be undone; reusing the input instance after a `Complete()` requires constructing a fresh instance.
+
+**Reach:** any long-lived transport where failover / reconnect logic might want to reuse the input object across reconnect cycles. Currently the consumer must allocate a fresh instance per session, paying ArrayPool rent + MRES allocation overhead even when the previous instance was already cleanly drained.
+
+**Related to** ACCORE-BIN-I-Q4T8 (which surfaces the same irreversibility from the multi-message-reuse angle); fix directions explored in [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-r8h4`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-r8h4-reset-reopen-primitive-for-a-stream-after-session-end).
+
+### ACCORE-BIN-I-J2X7: Multi-pattern documentation gap on the public API surface
+
+**Status:** Open
+**Affects:** `AsyncPipeReaderInput` class summary
+
+The streaming input type can be used by consumers across several distinct producer-consumer patterns:
+
+- **Strictly-sequential single-thread**: producer and consumer interleave on one thread (typical sync IPC loop). Buffer-cycle reset and message boundary detection happen on the same call-stack.
+- **Multi-thread feed + deserialize**: dedicated drain thread feeds bytes; dedicated worker thread deserializes. Buffer-cycle reset and consumer-progress signalling cross thread boundaries.
+- **Push / event-driven**: external code calls `Feed` from arbitrary callbacks (network event handlers, etc.); deserialization is scheduled by the consumer when convenient.
+- **One-shot single-message**: a single `Feed` sequence + `Complete` lifecycle, no reuse needed.
+
+The current class summary implicitly assumes one of these patterns; the public API surface lacks an explicit pattern catalogue with recommended low-level vs. higher-level API guidance per pattern. Without this, NuGet consumers may pick an API combination that does not match their threading model and run into latent correctness issues only visible under load.
+
+**Possible documentation direction** tracked in [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-s5n2`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-s5n2-pattern-catalogue-in-the-public-class-summary).
+
+## Streaming protocol — resolved issues
+
+### ACCORE-BIN-I-H4G2: Chunk-on-wire size = `chunkSize + HeaderSize` caused page-fragmentation
+
+**Status:** Resolved
+**Affects:** `AsyncPipeWriterOutput.AcquireChunk` (multi-message wire format)
+**Reach:** any page-aligned transport buffer (kernel pipe / file / network) — not transport-specific.
+
+Previously, the `chunkSize` parameter was treated as the data-payload size and the chunk-on-wire became `chunkSize + HeaderSize` bytes. For page-aligned `chunkSize` values (4 KB / 16 KB / 64 KB), every chunk-on-wire overran the matching kernel page boundary by `HeaderSize` bytes, forcing two kernel-page transfers per chunk. The producer-side `Stream.WriteAsync` saw page-fragmentation on every chunk; the consumer-side `Feed` saw chunk-headers split across boundaries.
+
+**Resolution:** the `chunkSize` parameter now denotes the **chunk-on-wire total size** (header + data); the data payload is `chunkSize - HeaderSize` in framed mode. Page-aligned `chunkSize` now lands exactly in one kernel-buffer slot, no fragmentation. Wire format unchanged. Documentation follow-up tracked in [`BINARY_ASYNCPIPE_TODO.md#accore-bin-t-g3w8`](BINARY_ASYNCPIPE_TODO.md#accore-bin-t-g3w8-transport-buffer-alignment-best-practice-doc-section).
+
+### ACCORE-BIN-I-N3P7: `Initialize` snapshot of stale `_writePos` in multi-message reuse
+
+**Status:** Resolved
+**Affects:** `AsyncPipeReaderInput.Initialize` / `AppendToBuffer` / `MessageDone` interaction
+
+Previously, when the `[202]` end-of-message marker reset only the framing state (NOT the buffer-position counters), a subsequent `Initialize` call on the calling thread could snapshot `_writePos` while it still pointed at the previous message's tail. The consumer would then re-read previous-message bytes from buffer offset 0, or — worse — observe a transient mixed state where the producer-thread's first `AppendToBuffer` for the next message had partially overwritten the buffer's start while the consumer held the stale `bufferLength`. Symptom: a later message header-byte appeared as garbage (`0xEE` in version slot, etc.), surfacing as `AcBinaryDeserializationException`.
+
+**Resolution:** `MessageDone()` (called by the consumer after a message is fully read — automatically from `Deserialize`'s finally-block) atomically resets BOTH `_writePos` AND `_readPos` to 0 on the calling thread, BEFORE the next `Initialize` runs. The next `AppendToBuffer` from the producer thread then writes from offset 0 without needing a buffer-cycle, and the consumer always sees a fresh `bufferLength` snapshot.
+
+## Wire format / Multi-message protocol limits
+
+The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` with `multiMessage = true`) is designed for **a single sequential producer + a single sequential consumer** sharing one long-lived stream. The format is `[201][UINT16 size][data]...[202]` per message, with no stream-id, no message-id, and no out-of-band recovery primitive. Real-world deployments may need patterns the format does not natively support — these limits are not bugs but **architectural decisions documented as constraints**.
+
+### ACCORE-BIN-I-M9P3: No native chunk-multiplexing for concurrent writers
+
+**Status:** Open (intentional limit)
+**Affects:** `AsyncPipeWriterOutput` (multi-message wire format `[201][UINT16][data]...[202]`)
+**Reach:** any deployment that wants multiple producers (or multiple logical streams) to interleave on a single long-lived `PipeWriter`.
+
+**Symptom:** If two threads concurrently call `AcBinarySerializer.SerializeChunkedFramed(..., sharedPipeWriter, ...)` on the same `PipeWriter` without external synchronization, their `[201][UINT16][data]` chunks interleave on the wire. The receiver's `AsyncPipeReaderInput` framing-state-machine has no way to demultiplex — the chunks all flow into one `_buffer`, and the next `Deserialize` reads a corrupted blend of both messages.
+
+**Root cause:** The wire format has no **stream-id** field in the chunk header. `[201][UINT16 size][data]` only encodes the size, not which logical stream the chunk belongs to. A receiver assumes a strictly sequential producer; concurrent producers must serialize externally (e.g. a `SemaphoreSlim` around `SerializeChunkedFramed`).
+
+**Why intentional:** Adding a stream-id would widen the chunk header (extra UINT8/UINT16 per chunk) — measurable wire overhead for the typical case (single producer / sequential RPC pipeline / hub-style message dispatch). The cost-benefit favours keeping the header lean and pushing concurrency to the application layer (one writer per logical stream).
+
+**Workarounds:**
+- **External lock around the writer**: `lock` / `SemaphoreSlim` ensuring at most one in-flight `SerializeChunkedFramed` call per `PipeWriter`. Producer-side concern.
+- **One PipeWriter per logical stream**: each stream gets its own connection / pipe instance. Best fit for transport-level multiplexing (e.g. HTTP/2 streams, multiple WebSocket frames).
+- **Out-of-band protocol layer**: build a stream-id wrapper above `SerializeChunkedFramed` (`[stream-id-byte][serialized-bytes]`); the receiver dispatches to per-stream `AsyncPipeReaderInput` instances. Possible but currently no built-in support.
+
+**Cross-references:**
+- Wire format definition: `AsyncPipeWriterOutput.cs` (`ChunkDataMarker = 201`, `ChunkEndMarker = 202`)
+- The `[200]` CHUNK_START marker is reserved/tolerated but currently has no semantics — could host a future stream-id extension if needed (out-of-scope today)
+
+### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream
+
+**Status:** Open (intentional limit)
+**Affects:** `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options)`
+**Reach:** any consumer that wants a long-lived `AsyncPipeReaderInput` to receive **mixed-type messages** (e.g. `Request` then `Response` then `Heartbeat` on the same connection — the typical RPC/Hub pattern).
+
+**Symptom:** The `Deserialize` overload is parameterized on a single `T`. If consecutive messages on the same long-lived input are different types, the consumer has no way (within the AcBinary API surface) to know which `T` to pass for the next call.
+
+**Root cause:** AcBinary's wire format does **not** include a type-discriminator before the payload. The serializer writes the object graph directly, and the deserializer must know the target type up-front. This is by design — the format is optimized for size, not for self-description.
+
+**Why intentional:** Type discriminators (4-byte hash, length-prefixed type-name string, etc.) cost wire bytes per message and require shared registries between producer and consumer. The framework keeps these concerns out of the AcBinary core and pushes them to the **dispatch layer** above (where they can be application-tuned: short tags, hash maps, type-id enums).
+
+**Workarounds:**
+- **Tag-based dispatch above AcBinary**: prefix each message with an `int` (or enum) tag the consumer reads first to choose `Deserialize`. The consumer encapsulates the tag-read + type-dispatch in its own deserialization wrapper.
+- **Polymorphic envelope type**: define a single `T = Envelope` containing a discriminator field + raw payload bytes; the consumer deserializes the envelope, switches on the discriminator, and re-deserializes the payload as the concrete type from the inner `byte[]`. Adds a small layer of indirection but works on top of fix-T `Deserialize`.
+- **One input per type-stream**: separate streams per message-class. Practical when the type-set is small and the transport can afford multiple connections.
+
+### ACCORE-BIN-I-Z2X9: Wire-format has no built-in cancel/timeout recovery
+
+**Status:** Open (intentional limit)
+**Affects:** `AsyncPipeReaderInput` framing state machine (multi-message mode)
+**Reach:** any deployment that wants to recover a long-lived stream from a partial/aborted message (sender crash mid-message, network timeout mid-chunk, application cancellation mid-`SerializeChunkedFramed`).
+
+**Symptom:** If a sender starts writing a multi-chunk message — `[201][UINT16=N][partial-data-of-K-bytes-where-K)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`.
+
+Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain unchanged in this step — they keep serving the SignalR `AcBinaryHubProtocol.TryParseChunkData` path. Migration to the new class is in Step 6 (`ACCORE-SBP-T-G7T2`).
+
+**Naming rationale:** `AsyncPipeReaderInput` mirrors the existing send-side `AsyncPipeWriterOutput`. The `Async` prefix follows .NET BCL convention for type-level naming (`AsyncEnumerable`, `IAsyncDisposable`, `AsyncLocal`).
+
+**Acceptance:**
+- New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race).
+- Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression).
+
+## ACCORE-BIN-T-M2K1: Add `AsyncPipeReaderInput.DrainFromAsync` extension (Step 2 of ADR-0003)
+**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4`
+
+Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file). Pulls from a `System.IO.Pipelines.PipeReader` and feeds the input via repeated `Feed(span)` calls; calls `Complete()` at end-of-stream.
+
+Separate file (not a method on the class) so the core `AsyncPipeReaderInput.cs` does not import `System.IO.Pipelines` in its primary contract — the pull-mode is opt-in at use-sites.
+
+**Acceptance:**
+- Extension drains an in-memory `Pipe` end-to-end in a unit test (write some bytes → DrainFromAsync → assert AsyncPipeReaderInput state).
+- `Complete()` correctly invoked at end-of-stream (`result.IsCompleted`).
+
+## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003)
+**Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1`
+
+The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path.
+
+**Acceptance:**
+- Test passes consistently on Windows + Linux CI.
+- Measured perf delta documented in test output / commit message.
+- Test serves as regression guard for future receive-side changes (no silent perf-cliff regression goes undetected).
+
+## ACCORE-BIN-T-A3T8: Add NamedPipe helpers — `SerializeToNamedPipeAsync` / `DeserializeFromNamedPipeAsync` (Step 4 of ADR-0003)
+**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 4 · **Depends on:** `ACCORE-BIN-T-V7C9`
+
+Add static extension methods on `AcBinarySerializerOptions` for full NamedPipe IPC lifecycle (one-shot send / receive). New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerNamedPipeExtensions.cs`. Send: `NamedPipeServerStream` → `PipeWriter.Create(stream)` → `AsyncPipeWriterOutput`. Receive: `NamedPipeClientStream` → `PipeReader.Create(stream)` → `AsyncPipeReaderInput.DrainFromAsync`.
+
+Cross-platform: Windows + Linux (Unix-domain-socket via NamedPipe BCL API). WASM throws `PlatformNotSupportedException` per BCL contract.
+
+**Acceptance:**
+- Cross-platform integration test: roundtrip a complex object graph through a NamedPipe; assert structural equality.
+- WASM build does not link these helpers (or throws clear PNS at runtime if invoked).
+
+## ACCORE-BIN-T-B5Y6: Add FileStream helpers — `SerializeToFileStreamAsync` / `DeserializeFromFileStreamAsync` (Step 5 of ADR-0003)
+**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 5 · **Depends on:** `ACCORE-BIN-T-A3T8`
+
+Add static extension methods on `AcBinarySerializerOptions` for streaming file I/O. New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerFileStreamExtensions.cs`. Send: `FileStream.Create(path)` → `PipeWriter.Create(fileStream)` → `AsyncPipeWriterOutput`. Receive: `FileStream.OpenRead(path)` → `PipeReader.Create(fileStream)` → `AsyncPipeReaderInput.DrainFromAsync`.
+
+**Critical streaming-doctrine invariant:** peak buffer memory bounded by `BufferWriterChunkSize × 2` (~8 KB at default), regardless of file size. **NOT file-size-aware** — do not pre-allocate to file size (would defeat streaming and break zerocopy / zeroalloc).
+
+**Acceptance:**
+- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout.
+- Full structural equality of round-tripped object.
+
+## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput
+**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported)
+
+Reuse a long-lived `AsyncPipeReaderInput` across multiple `Deserialize(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical fresh-instance pattern requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports.
+
+**Possible directions** (none fixed; pre-design):
+
+- **`Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely.
+
+- **New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface.
+
+- **`ResetCompletion()` for framed mode**: orthogonal to the cursor design — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen.
+
+- **Consumer-thread atomic position-reset on message-done**: a `MessageDone()`-style method called from the consumer (typically the deserializer's finally-block) atomically resets BOTH `_writePos` AND `_readPos` to 0 on the calling thread BEFORE the next `Initialize` runs. Trade-off: relies on the calling thread being the consumer; multi-thread feed+deserialize patterns need different coordination.
+
+**Acceptance** (for whichever direction is picked):
+- New tests exercise `N` consecutive `Deserialize(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression).
+- Existing single-message tests continue to pass.
+- Wire format unchanged (consumer-side reader plumbing, not a wire-level change).
+- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup**.
+
+## ACCORE-BIN-T-V4Q9: Convenience API for the strictly-sequential single-thread pattern
+**Priority:** P3 · **Type:** API · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface), [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported)
+
+A higher-level wrapper for the typical single-thread sequential `Feed → Deserialize → reset` cycle. Possible directions (none committed; pre-design):
+
+- **Delegate-based** API on the input: caller passes a deserializer lambda; the input encapsulates the lifecycle (`try` / `finally` + atomic reset). Stateful overload with `state`-parameter for zero-alloc on hot paths (avoids closure capture).
+- **`IDisposable` scope object**: caller acquires a per-message scope; the scope's `Dispose` triggers reset.
+- **Method-pair on the input**: explicit `BeginMessage` / `EndMessage` in the consumer's hands.
+
+Trade-offs: delegate has runtime cost but is most ergonomic; scope-object adds one allocation per message but composes naturally with `using`; method-pair is most explicit but most error-prone (forget `EndMessage` → leak / corruption).
+
+Implementation only after the related issues are documented and the multi-pattern catalogue exists (so the convenience API's scope is clear: it covers ONE pattern, not all).
+
+## ACCORE-BIN-T-M7B3: Implicit session-end signal in `Dispose()`
+**Priority:** P3 · **Type:** API · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-w6k4`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-w6k4-disposal-trap-idisposabledispose-does-not-guarantee-consumer-wakeup)
+
+Possible directions to consider (none chosen):
+
+- **Embed an idempotent session-end signal in `Dispose()`** before tearing down the wait-primitive and buffers. The plain `using` pattern would then suffice — explicit `Complete()` becomes optional. Trade-off: subtle behavioral shift; consumers relying on the "Dispose without prior Complete" path returning data may need to migrate.
+- **Document the contract explicitly** without changing behavior: the public XML doc states the consumer-thread must be quiesced before disposal. Lowest-risk; relies on documentation.
+- **Throw on `Dispose` if `Complete` was not called**: stricter contract, surfaces the trap as a clear error rather than `ObjectDisposedException` from the wait-primitive. Breaking change.
+
+Pick a direction before implementing; each has different consumer-migration costs.
+
+## ACCORE-BIN-T-K1F6: Public exposure of the wire-frame fixed overhead
+**Priority:** P3 · **Type:** API
+
+The wire-frame header byte-size (currently a private constant on `AsyncPipeWriterOutput`) is needed by consumers calculating transport-buffer sizing — kernel pipe buffer width on `NamedPipeServerStream`, `BufferedStream` wrapper size, etc. Possible directions:
+
+- **Expose a public const** on `AsyncPipeWriterOutput` (e.g. `FrameHeaderSize`).
+- **Add a static helper** that returns the chunk-on-wire size given a chunk-data size (or vice versa).
+- **Document only**: state the framing overhead in the class XML doc and let consumers hard-code.
+
+Smallest API-surface addition that solves the consumer-side calculation problem. Pick one direction before implementing.
+
+## ACCORE-BIN-T-R8H4: Reset / reopen primitive for a stream after session-end
+**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-d8l5`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-d8l5-session-end-is-irreversible-no-reset-reopen-primitive)
+
+Possible directions for a failover / reconnect scenario (none committed):
+
+- **`Reset()` method**: a one-method primitive that flips the internal session-end flag back to off and clears the buffer. Trade-off: thread-safety contract — must match the disposal contract; partially-completed framing-state must be explicitly reset.
+- **Helper-factory wrapping the lifecycle**: a `using`-friendly factory that produces a fresh instance per session, hiding the irreversibility behind a higher-level primitive.
+- **Document only**: state that fresh-instance allocation is the supported reconnect pattern; reuse is unsupported.
+
+Each has different thread-safety implications and wire-formatting reentrancy requirements; explore before any code.
+
+## ACCORE-BIN-T-S5N2: Pattern catalogue in the public class summary
+**Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface)
+
+Extend the `AsyncPipeReaderInput` class summary with an explicit pattern catalogue: each supported producer-consumer pattern (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with its recommended API (low-level Feed/Initialize/MessageDone vs. any higher-level convenience wrapper). NuGet consumers can self-select the pattern that matches their threading model.
+
+## ACCORE-BIN-T-G3W8: Transport-buffer alignment best-practice doc section
+**Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2-chunk-on-wire-size--chunksize--headersize-caused-page-fragmentation)
+
+A documentation section covering the alignment of the user-space `chunkSize` and the transport-level kernel buffer (general principle, not transport-specific): page-aligned matching avoids fragmentation, mismatched sizes cause two kernel-page transfers per chunk. Reference benchmark archive(s) where empirical numbers exist.
diff --git a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
index d662996..0b87804 100644
--- a/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
+++ b/AyCode.Core/docs/BINARY/BINARY_ISSUES.md
@@ -1,5 +1,7 @@
# Binary Serializer — Known Issues & Limitations
+This page covers issues in the **binary serializer core** (format, SGen, options, deserialization context, buffer writer). Issues specific to the **streaming I/O layer** (`AsyncPipeReaderInput` + `AsyncPipeWriterOutput`, multi-message wire framing, sliding-window buffer, producer-consumer synchronization) are tracked separately in [`BINARY_ASYNCPIPE_ISSUES.md`](BINARY_ASYNCPIPE_ISSUES.md).
+
## Deserialization
### ACCORE-BIN-I-D2J5: Non-array-backed memory — per-segment copy
@@ -39,34 +41,6 @@ The scratch buffer is `ArrayPool.Rent`-ed on first cross-boundary read and reuse
**Possible optimization:** Span-by-span UTF-8 decode for cross-boundary strings (like MessagePack). Low priority — most strings are shorter than a segment (4KB).
-### ACCORE-BIN-I-Q4T8: AsyncPipeReaderInput multi-message reuse not supported
-
-**Status:** Open
-**Affects:** `AsyncPipeReaderInput` (both raw mode `stripChunkFraming: false` and framed mode)
-**Reach:** any long-lived pipe transport (NamedPipe, FileStream, NetworkStream) that needs to stream multiple AcBinary messages over a single connection without setting up a fresh input per message.
-
-**Symptom:** Calling `AcBinaryDeserializer.Deserialize(input, opts)` repeatedly on the same long-lived `AsyncPipeReaderInput` instance silently corrupts data on the second and subsequent calls. In raw mode the second call re-reads the FIRST message's bytes from buffer position 0; in framed mode the `[202]` CHUNK_END marker leaves `_completed = true` permanently, breaking any further reads.
-
-**Root cause:**
-
-1. `AsyncPipeReaderInput.Initialize(out buffer, out position, out bufferLength)` always emits `position = 0` — no "consumed offset" / "next-message-start" cursor is preserved between calls.
-
-2. The sliding-window reset-to-0 in `AppendToBuffer` (`if (rp > 0 && rp == _writePos) → reset both to 0`) only fires when the consumer has caught up via `TryAdvanceSegment`. For small messages that fit entirely inside the initial buffer capacity (4-8 KB), `TryAdvanceSegment` is **never** called during deserialization — so `_readPos` stays at 0, the reset never fires, and a second `Deserialize(input)` call starts reading the same buffer from offset 0 again (= duplicate of message #1).
-
-3. `Complete()` (called explicitly OR implicitly via the `[202]` CHUNK_END marker in framed mode) sets `_completed = true` irreversibly. There is no Reset / Reopen method.
-
-**Workarounds:**
-
-- **Per-message fresh `AsyncPipeReaderInput` instance** (the canonical pattern, used internally by `DeserializeFromPipeReaderAsync`): `using var input = new AsyncPipeReaderInput(...);` + drain Task + deserialization Task per message. Cost: one ArrayPool rent + one `ManualResetEventSlim` allocation per message + two `Task.Run` per message. Acceptable for typical IPC consumers but precludes any zero-alloc-per-message streaming on long-lived raw transports.
-
-- **Multiplexed wire format** (`SerializeChunkedFramed` + `AsyncPipeReaderInput(stripChunkFraming: true)`): a parser layer above (e.g. `AcBinaryHubProtocol.TryParseChunkData`) parses `[201][UINT16][data]` frames and feeds individual messages into per-message inputs. SignalR's `BinaryProtocolMode.AsyncSegment` works this way. The `[202]`-end-of-message rule still applies, so each message logically needs its own input — but the parser can keep the wire-level reader long-lived. Implies pipeline parallelism (multiple in-flight messages), which may not match every consumer's needs.
-
-**Cross-references:**
-- ADR `docs/adr/0003-acbinary-streaming-receive-architecture.md` — origin of the streaming-receive architecture
-- `AcBinaryDeserializer.DeserializeFromPipeReaderAsync` — reference single-message usage pattern
-- `AsyncPipeReaderInput.Initialize` / `TryAdvanceSegment` / `AppendToBuffer` — the three sites that together enforce single-use semantics
-- Tracked fix: [`BINARY_TODO.md#accore-bin-t-r5k2`](BINARY_TODO.md#accore-bin-t-r5k2-multi-message-reuse-for-asyncpipereaderinput)
-
## Serialization
### ACCORE-BIN-I-K8R4: BufferWriterBinaryOutput fallback path allocates per-chunk
@@ -103,75 +77,6 @@ Same `TryGetArray` fallback as `BufferWriterBinaryOutput` (ACCORE-BIN-I-K8R4). K
Same constraint as ACCORE-BIN-I-P3M6 — `IBinaryInputBase` interface is synchronous. `ReadAsync().GetAwaiter().GetResult()` blocks when waiting for more data from the pipe. Currently not used in production (SignalR delivers complete messages via `TryParseMessage`). Reserved for future direct-pipe deserialization scenarios.
-## Wire format / Multi-message protocol limits
-
-The multi-message wire format (`AsyncPipeWriterOutput` + `AsyncPipeReaderInput` with `multiMessage = true`) is designed for **a single sequential producer + a single sequential consumer** sharing one long-lived stream. The format is `[201][UINT16 size][data]...[202]` per message, with no stream-id, no message-id, and no out-of-band recovery primitive. Real-world deployments may need patterns the format does not natively support — these limits are not bugs but **architectural decisions documented as constraints**.
-
-### ACCORE-BIN-I-M9P3: No native chunk-multiplexing for concurrent writers
-
-**Status:** Open (intentional limit)
-**Affects:** `AsyncPipeWriterOutput` (multi-message wire format `[201][UINT16][data]...[202]`)
-**Reach:** any deployment that wants multiple producers (or multiple logical streams) to interleave on a single long-lived `PipeWriter`.
-
-**Symptom:** If two threads concurrently call `AcBinarySerializer.SerializeChunkedFramed(..., sharedPipeWriter, ...)` on the same `PipeWriter` without external synchronization, their `[201][UINT16][data]` chunks interleave on the wire. The receiver's `AsyncPipeReaderInput` framing-state-machine has no way to demultiplex — the chunks all flow into one `_buffer`, and the next `Deserialize` reads a corrupted blend of both messages.
-
-**Root cause:** The wire format has no **stream-id** field in the chunk header. `[201][UINT16 size][data]` only encodes the size, not which logical stream the chunk belongs to. A receiver assumes a strictly sequential producer; concurrent producers must serialize externally (e.g. a `SemaphoreSlim` around `SerializeChunkedFramed`).
-
-**Why intentional:** Adding a stream-id would widen the chunk header (extra UINT8/UINT16 per chunk) — measurable wire overhead for the typical case (single producer / sequential RPC pipeline / SignalR-style hub message). The cost-benefit favours keeping the header lean and pushing concurrency to the application layer (one writer per logical stream).
-
-**Workarounds:**
-- **External lock around the writer**: `lock` / `SemaphoreSlim` ensuring at most one in-flight `SerializeChunkedFramed` call per `PipeWriter`. Producer-side concern.
-- **One PipeWriter per logical stream**: each stream gets its own connection / pipe instance. Best fit for transport-level multiplexing (e.g. HTTP/2 streams, multiple WebSocket frames).
-- **Out-of-band protocol layer**: build a stream-id wrapper above `SerializeChunkedFramed` (`[stream-id-byte][serialized-bytes]`); the receiver dispatches to per-stream `AsyncPipeReaderInput` instances. Possible but currently no built-in support.
-
-**Cross-references:**
-- Wire format definition: `AsyncPipeWriterOutput.cs` (`ChunkDataMarker = 201`, `ChunkEndMarker = 202`)
-- The `[200]` CHUNK_START marker is reserved/tolerated but currently has no semantics — could host a future stream-id extension if needed (out-of-scope today)
-
-### ACCORE-BIN-I-T6V2: Single fixed type per long-lived stream
-
-**Status:** Open (intentional limit)
-**Affects:** `AcBinaryDeserializer.Deserialize(AsyncPipeReaderInput input, AcBinarySerializerOptions options)`
-**Reach:** any consumer that wants a long-lived `AsyncPipeReaderInput` to receive **mixed-type messages** (e.g. `Request` then `Response` then `Heartbeat` on the same connection — the typical RPC/Hub pattern).
-
-**Symptom:** The `Deserialize` overload is parameterized on a single `T`. If consecutive messages on the same long-lived input are different types, the consumer has no way (within the AcBinary API surface) to know which `T` to pass for the next call.
-
-**Root cause:** AcBinary's wire format does **not** include a type-discriminator before the payload. The serializer writes the object graph directly, and the deserializer must know the target type up-front. This is by design — the format is optimized for size, not for self-description.
-
-**Why intentional:** Type discriminators (4-byte hash, length-prefixed type-name string, etc.) cost wire bytes per message and require shared registries between producer and consumer. The framework keeps these concerns out of the AcBinary core and pushes them to the **dispatch layer** above (where they can be application-tuned: short tags, hash maps, type-id enums).
-
-**Workarounds:**
-- **Tag-based dispatch above AcBinary**: prefix each message with an `int` (or enum) tag the consumer reads first to choose `Deserialize`. This is exactly how `AcBinaryHubProtocol` already does it: `SignalRCrudTags`-style integer tags identify the message class on the wire.
-- **Polymorphic envelope type**: define a single `T = Envelope` containing a discriminator field + raw payload bytes; the consumer deserializes the envelope, switches on the discriminator, and re-deserializes the payload as the concrete type from the inner `byte[]`. Adds a small layer of indirection but works on top of fix-T `Deserialize`.
-- **One input per type-stream**: separate streams per message-class. Practical when the type-set is small and the transport can afford multiple connections.
-
-**Cross-references:**
-- `AcBinaryHubProtocol` uses tag-based dispatch — canonical real-world pattern (see `AyCode.Services/SignalRs/`).
-- `SignalRCrudTags`, `AcSignalRDataSource` — examples of tag-driven message classification.
-
-### ACCORE-BIN-I-Z2X9: Wire-format has no built-in cancel/timeout recovery
-
-**Status:** Open (intentional limit)
-**Affects:** `AsyncPipeReaderInput` framing state machine (multi-message mode)
-**Reach:** any deployment that wants to recover a long-lived stream from a partial/aborted message (sender crash mid-message, network timeout mid-chunk, application cancellation mid-`SerializeChunkedFramed`).
-
-**Symptom:** If a sender starts writing a multi-chunk message — `[201][UINT16=N][partial-data-of-K-bytes-where-K)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`.
-
-Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain unchanged in this step — they keep serving the SignalR `AcBinaryHubProtocol.TryParseChunkData` path. Migration to the new class is in Step 6 (`ACCORE-SBP-T-G7T2`).
-
-**Naming rationale:** `AsyncPipeReaderInput` mirrors the existing send-side `AsyncPipeWriterOutput`. The `Async` prefix follows .NET BCL convention for type-level naming (`AsyncEnumerable`, `IAsyncDisposable`, `AsyncLocal`).
-
-**Acceptance:**
-- New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race).
-- Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression).
-
-## ACCORE-BIN-T-M2K1: Add `AsyncPipeReaderInput.DrainFromAsync` extension (Step 2 of ADR-0003)
-**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4`
-
-Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file). Pulls from a `System.IO.Pipelines.PipeReader` and feeds the input via repeated `Feed(span)` calls; calls `Complete()` at end-of-stream.
-
-Separate file (not a method on the class) so the core `AsyncPipeReaderInput.cs` does not import `System.IO.Pipelines` in its primary contract — the pull-mode is opt-in at use-sites.
-
-**Acceptance:**
-- Extension drains an in-memory `Pipe` end-to-end in a unit test (write some bytes → DrainFromAsync → assert AsyncPipeReaderInput state).
-- `Complete()` correctly invoked at end-of-stream (`result.IsCompleted`).
-
-## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003)
-**Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1`
-
-The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path.
-
-**Acceptance:**
-- Test passes consistently on Windows + Linux CI.
-- Measured perf delta documented in test output / commit message.
-- Test serves as regression guard for future receive-side changes (no silent perf-cliff regression goes undetected).
-
-## ACCORE-BIN-T-A3T8: Add NamedPipe helpers — `SerializeToNamedPipeAsync` / `DeserializeFromNamedPipeAsync` (Step 4 of ADR-0003)
-**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 4 · **Depends on:** `ACCORE-BIN-T-V7C9`
-
-Add static extension methods on `AcBinarySerializerOptions` for full NamedPipe IPC lifecycle (one-shot send / receive). New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerNamedPipeExtensions.cs`. Send: `NamedPipeServerStream` → `PipeWriter.Create(stream)` → `AsyncPipeWriterOutput`. Receive: `NamedPipeClientStream` → `PipeReader.Create(stream)` → `AsyncPipeReaderInput.DrainFromAsync`.
-
-Cross-platform: Windows + Linux (Unix-domain-socket via NamedPipe BCL API). WASM throws `PlatformNotSupportedException` per BCL contract.
-
-**Acceptance:**
-- Cross-platform integration test: roundtrip a complex object graph through a NamedPipe; assert structural equality.
-- WASM build does not link these helpers (or throws clear PNS at runtime if invoked).
-
-## ACCORE-BIN-T-B5Y6: Add FileStream helpers — `SerializeToFileStreamAsync` / `DeserializeFromFileStreamAsync` (Step 5 of ADR-0003)
-**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 5 · **Depends on:** `ACCORE-BIN-T-A3T8`
-
-Add static extension methods on `AcBinarySerializerOptions` for streaming file I/O. New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerFileStreamExtensions.cs`. Send: `FileStream.Create(path)` → `PipeWriter.Create(fileStream)` → `AsyncPipeWriterOutput`. Receive: `FileStream.OpenRead(path)` → `PipeReader.Create(fileStream)` → `AsyncPipeReaderInput.DrainFromAsync`.
-
-**Critical streaming-doctrine invariant:** peak buffer memory bounded by `BufferWriterChunkSize × 2` (~8 KB at default), regardless of file size. **NOT file-size-aware** — do not pre-allocate to file size (would defeat streaming and break zerocopy / zeroalloc).
-
-**Acceptance:**
-- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout.
-- Full structural equality of round-tripped object.
-
## ACCORE-BIN-T-K3W7: Rename `BufferWriterChunkSize` to reflect actual semantics
-**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API)
+**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API) · **Streaming impact:** see [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) for the streaming-side companion considerations (chunk-on-wire vs internal-buffer semantics)
The property name `BufferWriterChunkSize` is misleading: across the three output paths it does NOT consistently represent a "chunk".
@@ -202,21 +148,3 @@ Pick one before touching code. Option 2 is the most correct but adds API surface
- Wire format unchanged. Default values unchanged (65535 / equivalent).
- Migration note in CHANGELOG / release notes since this is a breaking change to `AcBinarySerializerOptions`.
-## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput
-**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ISSUES.md#accore-bin-i-q4t8`](BINARY_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported) — full Symptom / Root cause / Workarounds documented there; **do not duplicate here**.
-
-Add a "next message" cursor / reset semantics so a long-lived `AsyncPipeReaderInput` can be reused across multiple `Deserialize(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical pattern (`DeserializeFromPipeReaderAsync`) requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports (NamedPipe, FileStream, NetworkStream).
-
-**Design candidates** (pick one — prototype first, measure the small-message zero-alloc claim before committing):
-
-- **A. `Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely.
-
-- **B. New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface.
-
-- **C. `ResetCompletion()` for framed mode**: orthogonal to A/B — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen.
-
-**Acceptance:**
-- New tests exercise `N` consecutive `Deserialize(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression).
-- Existing `DeserializeFromPipeReaderAsync` unit tests continue to pass (single-message path unchanged).
-- Wire format unchanged (this is consumer-side reader plumbing, not a wire-level change).
-- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup** (ArrayPool rent reused across calls, no MRES per call, no `Task.Run` per call). The deserialized object graph allocations stay (those are user-visible).
diff --git a/AyCode.Core/docs/BINARY/README.md b/AyCode.Core/docs/BINARY/README.md
index a22180e..44606a1 100644
--- a/AyCode.Core/docs/BINARY/README.md
+++ b/AyCode.Core/docs/BINARY/README.md
@@ -10,8 +10,10 @@ Reference documentation for the AcBinary serialization system. Primary goal: **s
- [`BINARY_IMPLEMENTATION.md`](BINARY_IMPLEMENTATION.md) — Internal implementation details
- [`BINARY_WRITERS.md`](BINARY_WRITERS.md) — Writer internals (streaming, buffering)
- [`BINARY_SGEN.md`](BINARY_SGEN.md) — Source generator (`AyCode.Core.Serializers.SourceGenerator`)
-- [`BINARY_ISSUES.md`](BINARY_ISSUES.md) — Known issues and limitations
-- [`BINARY_TODO.md`](BINARY_TODO.md) — Planned work / open tickets
+- [`BINARY_ISSUES.md`](BINARY_ISSUES.md) — Known issues and limitations (binary serializer core)
+- [`BINARY_TODO.md`](BINARY_TODO.md) — Planned work / open tickets (binary serializer core)
+- [`BINARY_ASYNCPIPE_ISSUES.md`](BINARY_ASYNCPIPE_ISSUES.md) — Known issues and limitations (streaming I/O layer: `AsyncPipeReaderInput` + `AsyncPipeWriterOutput`)
+- [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) — Planned work / open tickets (streaming I/O layer)
## Start here
@@ -25,4 +27,4 @@ For a first read-through, start with [`BINARY_FEATURES.md`](BINARY_FEATURES.md)
## Related ADRs
-- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive and transport-agnostic helpers* (Status: Proposed (2026-04-27)). Repo-level cross-cutting ADR establishing the receive-side streaming architecture and transport-agnostic helpers (NamedPipe + FileStream) for this serializer. `AsyncPipeReaderInput` (sealed class) consolidates today's `SegmentBufferReader` + `SegmentBufferReaderInput` pair into a single self-contained primitive; the `Async`-prefixed naming mirrors the existing `AsyncPipeWriterOutput` send-side primitive. Implementation tracked across `ACCORE-BIN-T-D6H4` / `M2K1` / `V7C9` / `A3T8` / `B5Y6` (Steps 1–5) in `BINARY_TODO.md` and `ACCORE-SBP-T-G7T2` (Step 6) in `AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md`.
+- [`AyCode.Core/docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) — *AcBinary streaming receive — AsyncPipeReaderInput unified primitive and transport-agnostic helpers* (Status: Proposed (2026-04-27)). Repo-level cross-cutting ADR establishing the receive-side streaming architecture and transport-agnostic helpers (NamedPipe + FileStream) for this serializer. `AsyncPipeReaderInput` (sealed class) consolidates today's `SegmentBufferReader` + `SegmentBufferReaderInput` pair into a single self-contained primitive; the `Async`-prefixed naming mirrors the existing `AsyncPipeWriterOutput` send-side primitive. Implementation tracked across `ACCORE-BIN-T-D6H4` / `M2K1` / `V7C9` / `A3T8` / `B5Y6` (Steps 1–5) in [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) and `ACCORE-SBP-T-G7T2` (Step 6) in `AyCode.Services/docs/SIGNALR_BINARY_PROTOCOL/SIGNALR_BINARY_PROTOCOL_TODO.md`.