[LOADED_DOCS: 8 files, no new loads]

Add AsyncPipe streaming mode, doc split, and test data tweaks

- Add AsyncPipe-only streaming mode to benchmark suite (CLI/menu)
- Aggregate and display AcBinarySerializableAttribute flags in options output
- Raise IId-ref and repeated-string share in all test data to ~20%
- Use explicit AcBinarySerializable(false, true, ...) on all test models
- Split streaming I/O issues/TODOs into BINARY_ASYNCPIPE_ISSUES.md and BINARY_ASYNCPIPE_TODO.md
- Update README and references for new streaming doc structure
- Minor code and doc cleanups for clarity and accuracy
This commit is contained in:
Loretta 2026-05-01 09:31:46 +02:00
parent 6dbeae9884
commit 4375ca5b4a
9 changed files with 503 additions and 255 deletions

View File

@ -1,4 +1,5 @@
using AyCode.Core.Compression;
using AyCode.Core.Serializers.Attributes;
using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels;
using MemoryPack;
@ -9,6 +10,7 @@ using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
@ -64,12 +66,59 @@ public static class Program
private static readonly UTF8Encoding Utf8NoBom = new(encoderShouldEmitUTF8Identifier: false);
/// <summary>
/// Aggregated <see cref="AcBinarySerializableAttribute"/> feature flags across every type tagged with
/// the attribute in the loaded assemblies. Cached on first access (single reflection scan at startup).
/// Used by <see cref="BuildAcBinaryOptionsDescription"/> so the per-row Options column shows BOTH the
/// configured options-level value AND the effective attribute-level enable flag — a feature flagged
/// off at the type level overrides the options regardless of preset, and that asymmetry must surface
/// in the log to avoid misreading a "RefHandling=OnlyId" / "Interning=All" line as actually active.
/// Aggregation rule: if ALL tagged types have the feature enabled → <c>true</c>; if any tagged type
/// disables it → <c>false</c> (a single disabling type suppresses the feature on the type-graph).
/// </summary>
private static readonly (bool refHandling, bool internString, bool metadata, bool idTracking) _attrFlags
= ScanAcBinaryAttributeFlags();
private static (bool refHandling, bool internString, bool metadata, bool idTracking) ScanAcBinaryAttributeFlags()
{
var attrs = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => { try { return a.GetTypes(); } catch { return Array.Empty<Type>(); } })
.Select(t => t.GetCustomAttribute<AcBinarySerializableAttribute>())
.Where(a => a != null)
.ToList();
if (attrs.Count == 0) return (false, false, false, false);
return (
refHandling: attrs.All(a => a!.EnableRefHandlingFeature),
internString: attrs.All(a => a!.EnableInternStringFeature),
metadata: attrs.All(a => a!.EnableMetadataFeature),
idTracking: attrs.All(a => a!.EnableIdTrackingFeature));
}
/// <summary>
/// Common Options-column formatter for every AcBinary serializer benchmark row. Renders the
/// configured options-level value AND the effective attribute-level enable flag side-by-side
/// (e.g. <c>Interning=All(opt) | False (attr)</c>) so attribute-suppressed features cannot
/// silently mislead. Pass any benchmark-specific extras (e.g. <c>", BufferSize=4096B"</c>)
/// in <paramref name="extra"/> — they are appended after the common fields.
/// </summary>
private static string BuildAcBinaryOptionsDescription(AcBinarySerializerOptions options, string extra = "")
{
return $"WireMode={options.WireMode}, " +
$"RefHandling={options.ReferenceHandling}(opt) | {_attrFlags.refHandling} (attr), " +
$"Interning={options.UseStringInterning}(opt) | {_attrFlags.internString} (attr), " +
$"Metadata={options.UseMetadata}(opt) | {_attrFlags.metadata} (attr), " +
$"SGen={options.UseGeneratedCode}, " +
$"Compression={options.UseCompression}{extra}";
}
#if DEBUG
private const int WarmupIterations = 0;
private const int TestIterations = 1;
private const int BenchmarkSamples = 1; // Debug: single sample, fast iteration
#else
private static int WarmupIterations = 1000; //5000
private static int WarmupIterations = 5000; //5000
private static int TestIterations = 1000; //1000
private static int BenchmarkSamples = 3;
#endif
@ -83,16 +132,21 @@ public static class Program
// Done early so user is told immediately, not after warmup.
ValidateMemoryPackSetup();
// Determine layer (which test data to run) and opMode (ser/des/all).
// Determine layer (which test data to run), opMode (ser/des/all), and serializerMode (standard/asyncpipe).
// CLI args take precedence; if no args, show interactive menu.
// serializerMode: "standard" = all serializers EXCEPT AsyncPipe; "asyncpipe" = ONLY the AsyncPipe streaming benchmark.
// The two are mutually exclusive — AsyncPipe never runs alongside the standard set, so its long-lived pipe
// setup / kernel-buffer overhead does not skew the steady-state Byte[] / IBufferWriter measurements.
string layer;
var opMode = "all";
var serializerMode = "standard";
if (args.Length == 0)
{
var selection = ShowInteractiveMenu();
if (selection == null) return; // user pressed Q
layer = selection;
layer = selection.Value.layer;
serializerMode = selection.Value.serializerMode;
}
else
{
@ -117,6 +171,12 @@ public static class Program
{
layer = arg;
}
else if (arg is "asyncpipe" or "pipe")
{
// AsyncPipe-only mode: streaming I/O isolation across all test data.
layer = "all";
serializerMode = "asyncpipe";
}
else if (arg is "ser" or "serialize")
{
opMode = "serialize";
@ -137,11 +197,12 @@ public static class Program
System.Console.WriteLine("╔══════════════════════════════════════════════════════════════════════╗");
System.Console.WriteLine("║ COMPREHENSIVE SERIALIZER BENCHMARK SUITE ║");
System.Console.WriteLine("╚══════════════════════════════════════════════════════════════════════╝");
var allResults = new List<BenchmarkResult>();
var allTestDataSets = BenchmarkTestDataProvider.CreateTestDataSets();
var testDataSets = FilterByLayer(allTestDataSets, layer);
System.Console.WriteLine($"Layer: {layer} | OpMode: {opMode} | Iterations: {TestIterations} | Warmup: {WarmupIterations} | Samples: {BenchmarkSamples} (median)");
System.Console.WriteLine($"Layer: {layer} | OpMode: {opMode} | SerializerMode: {serializerMode} | Iterations: {TestIterations} | Warmup: {WarmupIterations} | Samples: {BenchmarkSamples} (median)");
System.Console.WriteLine($"Build: {BuildConfiguration} | .NET: {Environment.Version} | Test Type: {testDataSets.FirstOrDefault()?.TypeName ?? "unknown"} | Test Cells: {testDataSets.Count}/{allTestDataSets.Count}");
System.Console.WriteLine();
@ -157,7 +218,7 @@ public static class Program
System.Console.WriteLine($"Global JIT pre-warmup ({testDataSets.Count} cells × all serializers, light pass)...");
foreach (var testData in testDataSets)
{
var preSerializers = CreateSerializers(testData);
var preSerializers = CreateSerializers(testData, serializerMode);
try
{
foreach (var s in preSerializers)
@ -184,7 +245,7 @@ public static class Program
System.Console.WriteLine($"TEST DATA: {testData.DisplayName}");
System.Console.WriteLine($"{'═'.ToString().PadRight(70, '═')}");
var results = RunBenchmarksForTestData(testData, opMode);
var results = RunBenchmarksForTestData(testData, opMode, serializerMode);
allResults.AddRange(results);
}
@ -215,6 +276,7 @@ public static class Program
options.UseStringInterning = StringInterningMode.None;
var bytes = AcBinarySerializer.Serialize(order, options);
// Warmup (fills caches)
System.Console.WriteLine("Warming up (1000 iterations)...");
for (var i = 0; i < 1000; i++)
@ -247,16 +309,17 @@ public static class Program
System.Console.WriteLine(">>> ATTACH MEMORY PROFILER NOW <<<");
System.Console.WriteLine("Press any key to exit...");
System.Console.ReadKey(intercept: true);
System.Console.WriteLine();
System.Console.WriteLine("✓ Profiler mode complete. Exiting now.");
}
#region Benchmark Execution
private static List<BenchmarkResult> RunBenchmarksForTestData(TestDataSet testData, string mode)
private static List<BenchmarkResult> RunBenchmarksForTestData(TestDataSet testData, string mode, string serializerMode)
{
var results = new List<BenchmarkResult>();
var serializers = CreateSerializers(testData);
var serializers = CreateSerializers(testData, serializerMode);
// Round-trip correctness check — once per (cell × serializer), BEFORE warmup. Aborts the entire benchmark on failure.
System.Console.WriteLine("Verifying round-trip correctness...");
@ -322,11 +385,13 @@ public static class Program
// Dedicated alloc-only sample (separate from timing samples; keeps timing pure)
result.SerializeAllocBytesPerOp = MeasureAllocation(() => serializer.Serialize(), TestIterations);
}
if (mode is "all" or "deserialize" or "des")
{
result.DeserializeTimeMs = RunTimed(() => serializer.Deserialize(), TestIterations);
result.DeserializeAllocBytesPerOp = MeasureAllocation(() => serializer.Deserialize(), TestIterations);
}
// Compose RT from Ser+Des (the previously computed property's behavior, now explicit since RT is settable).
result.RoundTripTimeMs = result.SerializeTimeMs + result.DeserializeTimeMs;
result.RoundTripAllocBytesPerOp = result.SerializeAllocBytesPerOp + result.DeserializeAllocBytesPerOp;
@ -343,8 +408,32 @@ public static class Program
return results;
}
private static List<ISerializerBenchmark> CreateSerializers(TestDataSet testData)
private static List<ISerializerBenchmark> CreateSerializers(TestDataSet testData, string serializerMode)
{
// AsyncPipe-only mode — return ONLY the AsyncPipe streaming benchmark (no other serializer).
// Streaming I/O has long-lived pipe setup + kernel-buffer overhead that, when interleaved with
// the standard byte-array / IBufferWriter measurements, masks the steady-state numbers. Run it
// in isolation so the timing numbers reflect ONLY the streaming path.
if (serializerMode == "asyncpipe")
{
// NamedPipe — pipe-aligned chunk size for the long-lived IPC scenario. The chunkSize here
// drives the AsyncPipeWriterOutput's chunk-on-wire size (header + data, page-aligned thanks to
// the AcquireChunk fix) AND the kernel pipe buffer size (inBufferSize/outBufferSize on the
// NamedPipeServerStream ctor). Same value across both layers = one WriteFile(chunkSize) syscall
// fits blocking-free in one kernel pipe-buffer slot. Single source of truth for both app-level
// wire chunk AND kernel transfer unit; change ONLY this line when tuning.
var binaryFastModePipeChunkOnly = AcBinarySerializerOptions.FastMode;
binaryFastModePipeChunkOnly.BufferWriterChunkSize = 16_384; //AsyncPipeWriterOutput.MaxChunkSize;
return new List<ISerializerBenchmark>
{
new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunkOnly, "FastMode (PipeChunk)"),
};
}
// Standard mode — all serializers EXCEPT AsyncPipe (the streaming benchmark is opt-in via the
// AsyncPipe menu / CLI mode, never bundled with the steady-state suite).
var binaryNoInternOption = AcBinarySerializerOptions.Default;
binaryNoInternOption.UseStringInterning = StringInterningMode.None;
@ -354,20 +443,17 @@ public static class Program
var binaryFastModeNoSgenOption = AcBinarySerializerOptions.FastMode;
binaryFastModeNoSgenOption.UseGeneratedCode = false;
// Pipe-aligned max chunk size for the IBufferWriter / NamedPipe variants — matches
// AsyncPipeWriterOutput.MaxChunkSize (UINT16 max = 65535), the largest payload that fits in one
// [201][UINT16][data] wire frame. The same value also drives the kernel pipe buffer in the
// NamedPipeServerStream ctor (inBufferSize/outBufferSize) so the app-level chunk and the
// kernel-level transfer unit stay in sync — one WriteFile(chunkSize) syscall fits blocking-free in
// one kernel pipe-buffer slot, eliminating the page-segmentation in-syscall stall that plagued
// the previous 4 KB profile (where a 65 KB user-space chunk would still get sliced 16× inside
// the kernel because the default kernel pipe buffer is page-sized).
// Centralised here so ALL pipe-style benchmarks (BufWr new, NamedPipe) share a single source of
// truth — change ONLY THIS line when tuning the pipe chunk size, never inside individual benchmark
// ctors. Earlier 4 KB-baseline measurements remain comparable via the archived .LLM logs in
// Test_Benchmark_Results/Benchmark/.
var binaryFastModePipeChunk = AcBinarySerializerOptions.FastMode;
//AsyncPipeWriterOutput.MaxChunkSize;
// BufWr new — 4 KB chunk size for the FRESH ArrayBufferWriter scenario. The chunkSize here drives
// the serializer's GetSpan(N) request → the ArrayBufferWriter's internal allocation per call.
// Small chunk = small per-call allocation, optimum for one-shot serialization where each iteration
// allocates a fresh ABW. Independent of the AsyncPipe profile (different mechanism: alloc overhead
// vs syscall count).
var binaryFastModeBufWrChunk = AcBinarySerializerOptions.FastMode;
binaryFastModeBufWrChunk.BufferWriterChunkSize = 4096;
var defaultOptions = AcBinarySerializerOptions.Default;
defaultOptions.UseStringInterning = StringInterningMode.None;
defaultOptions.ReferenceHandling = ReferenceHandlingMode.OnlyId;
return new List<ISerializerBenchmark>
{
@ -379,7 +465,11 @@ public static class Program
// Fastest Byte[] — Runtime path (UseGeneratedCode=false). Same wire/options, no source-generated dispatch.
// Always paired with the SGen variant so every layer can compare the SGen speed-up apples-to-apples.
new AcBinaryBenchmark(testData.Order, binaryFastModeNoSgenOption, "FastMode"),
//new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.Default, "Default"),
// Default preset Byte[] — RefHandling=OnlyId (deduplicates IId-shared references on the wire) +
// UseStringInterning=All (deduplicates repeated strings). Showcases the Default preset's wire-size
// and CPU trade-off vs FastMode on the ~20% IId-ref / repeated-string test data.
new AcBinaryBenchmark(testData.Order, defaultOptions, "Default"),
//new AcBinaryBenchmark(testData.Order, binaryDefaultNoSgenOption, "Default"),
//new AcBinaryBenchmark(testData.Order, AcBinarySerializerOptions.WithoutReferenceHandling, "NoRef"),
//new AcBinaryBenchmark(testData.Order, binaryNoInternOption, "NoIntern"),
@ -388,17 +478,12 @@ public static class Program
new AcBinaryBufferWriterBenchmark(testData.Order, AcBinarySerializerOptions.FastMode, "FastMode"),
// AcBinary via IBufferWriter (FRESH ArrayBufferWriter per call — one-shot scenario).
// PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method).
new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
// 4 KB chunk size from binaryFastModeBufWrChunk — minimises the per-call ArrayBufferWriter
// allocation. Optimum for this scenario.
new AcBinaryFreshBufferWriterBenchmark(testData.Order, binaryFastModeBufWrChunk, "FastMode (4KB BufWr)"),
// AcBinary over a long-lived NamedPipe IPC connection — pipe set up ONCE, reused for every iteration.
// PipeChunk size from the centralised binaryFastModePipeChunk options instance (see top of method) —
// same value drives BOTH the app-level wire chunk AND the kernel pipe buffer (inBufferSize/outBufferSize
// in the NamedPipeServerStream ctor). Persistent connection + multi-message wire framing + max-size
// chunks aligned with the kernel transfer unit. Single-process loopback, so the number is a lower bound
// (real cross-process / cross-machine adds transport latency on top). Result row: full round-trip shown
// in RT ms, Ser/Des = N/A (IsRoundTripOnly).
new AcBinaryNamedPipeBenchmark(testData.Order, binaryFastModePipeChunk, "FastMode (PipeChunk)"),
// AsyncPipe streaming (AcBinaryNamedPipeBenchmark) is intentionally OMITTED here — run it via
// the dedicated AsyncPipe menu / CLI mode for isolated streaming-I/O measurements.
// ============================================================
// MemoryPack — three I/O modes for apples-to-apples comparison
@ -431,6 +516,7 @@ public static class Program
// Single-sample fast path (Debug or trivial run) — no allocation, no sort.
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
sw.Stop();
return sw.Elapsed.TotalMilliseconds;
}
@ -440,14 +526,14 @@ public static class Program
{
var sw = Stopwatch.StartNew();
for (var i = 0; i < iterations; i++) action();
sw.Stop();
times[s] = sw.Elapsed.TotalMilliseconds;
}
Array.Sort(times);
// Median: middle value for odd sample counts, average of two middles for even counts.
return samples % 2 == 1
? times[samples / 2]
: (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
return samples % 2 == 1 ? times[samples / 2] : (times[samples / 2 - 1] + times[samples / 2]) / 2.0;
}
/// <summary>
@ -458,8 +544,10 @@ public static class Program
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var before = GC.GetAllocatedBytesForCurrentThread();
for (var i = 0; i < iterations; i++) action();
var after = GC.GetAllocatedBytesForCurrentThread();
return (after - before) / iterations;
}
@ -477,8 +565,10 @@ public static class Program
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var before = GC.GetTotalAllocatedBytes(precise: true);
for (var i = 0; i < iterations; i++) action();
var after = GC.GetTotalAllocatedBytes(precise: true);
return (after - before) / iterations;
}
@ -486,6 +576,7 @@ public static class Program
private static readonly JsonSerializerOptions VerifyJsonOpts = new()
{
WriteIndented = false,
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
ReferenceHandler = System.Text.Json.Serialization.ReferenceHandler.IgnoreCycles
};
@ -498,8 +589,10 @@ public static class Program
{
if (a == null && b == null) return true;
if (a == null || b == null) return false;
var jsonA = JsonSerializer.Serialize(a, VerifyJsonOpts);
var jsonB = JsonSerializer.Serialize(b, VerifyJsonOpts);
return jsonA == jsonB;
}
@ -510,6 +603,7 @@ public static class Program
private static void ValidateMemoryPackSetup()
{
var typesToCheck = new[] { typeof(TestOrder) };
foreach (var type in typesToCheck)
{
var hasAttr = type.GetCustomAttributes(typeof(MemoryPackableAttribute), inherit: true).Any();
@ -517,6 +611,7 @@ public static class Program
{
System.Console.Error.WriteLine($"❌ FATAL: {type.FullName} is not [MemoryPackable] — MemoryPack would fall back to runtime resolver, comparison is INVALID for SGen-vs-SGen claim.");
System.Console.Error.WriteLine("Add [MemoryPackable] to the type and any nested types referenced from it.");
Environment.Exit(1);
}
}
@ -525,7 +620,7 @@ public static class Program
/// <summary>
/// Interactive menu shown when no CLI args. Returns the layer keyword (core/comprehensive/edge/all) or null on Quit.
/// </summary>
private static string? ShowInteractiveMenu()
private static (string layer, string serializerMode)? ShowInteractiveMenu()
{
System.Console.WriteLine();
System.Console.WriteLine("╔══════════════════════════════════════════════════════════╗");
@ -538,18 +633,22 @@ public static class Program
System.Console.WriteLine(" [2] Comprehensive — release validation");
System.Console.WriteLine(" [3] Edge cases — refactor verification");
System.Console.WriteLine(" [A] All layers");
System.Console.WriteLine(" [P] AsyncPipe — streaming I/O isolation (only AsyncPipe, all test data)");
System.Console.WriteLine(" [Q] Quit");
System.Console.Write("\nSelection: ");
var key = System.Console.ReadKey(intercept: false).KeyChar;
System.Console.WriteLine();
return char.ToLower(key) switch
{
'1' => "core",
'2' => "comprehensive",
'3' => "edge",
'a' => "all",
'1' => ("core", "standard"),
'2' => ("comprehensive", "standard"),
'3' => ("edge", "standard"),
'a' => ("all", "standard"),
'p' => ("all", "asyncpipe"),
'q' => null,
_ => "core"
_ => ("all", "standard")
};
}
@ -567,8 +666,6 @@ public static class Program
// P3 will add: "ColdStart", "VeryLarge", "PathologicalString", etc.
var edgeExtras = new string[] { /* P3 */ };
bool StartsWithAny(string name, string[] prefixes) => prefixes.Any(p => name.StartsWith(p));
return layer switch
{
"core" => all.Where(t => StartsWithAny(t.Name, coreNames)).ToList(),
@ -576,6 +673,8 @@ public static class Program
"edge" => all.Where(t => StartsWithAny(t.Name, coreNames) || StartsWithAny(t.Name, comprehensiveExtras) || StartsWithAny(t.Name, edgeExtras)).ToList(),
_ => all.ToList()
};
static bool StartsWithAny(string name, string[] prefixes) => prefixes.Any(name.StartsWith);
}
#endregion
@ -622,7 +721,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}";
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options);
public AcBinaryBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@ -783,7 +882,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B";
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B");
public AcBinaryFreshBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@ -881,7 +980,7 @@ public static class Program
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes => 0;
public bool IsRoundTripOnly => true;
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}, BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)";
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options, $", BufferSize={_options.BufferWriterChunkSize}B, Transport=NamedPipe(long-lived,multiMessage)");
public AcBinaryNamedPipeBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{
@ -1047,7 +1146,7 @@ public static class Program
public string OptionsPreset { get; }
public int SerializedSize => _serialized.Length;
public long SetupAllocBytes { get; }
public string OptionsDescription => $"WireMode={_options.WireMode}, RefHandling={_options.ReferenceHandling}, Interning={_options.UseStringInterning}, Metadata={_options.UseMetadata}, SGen={_options.UseGeneratedCode}, Compression={_options.UseCompression}";
public string OptionsDescription => BuildAcBinaryOptionsDescription(_options);
public AcBinaryBufferWriterBenchmark(TestOrder order, AcBinarySerializerOptions options, string optionsPreset)
{

View File

@ -47,7 +47,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
return new TestDataSet("Small (2x2x2x2)", order, iidRefPercent: 10);
return new TestDataSet("Small (2x2x2x2)", order, iidRefPercent: 20);
}
private static TestDataSet CreateMediumTestData(bool resetId = true)
@ -79,7 +79,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
return new TestDataSet("Medium (3x3x3x4)", order, iidRefPercent: 10);
return new TestDataSet("Medium (3x3x3x4)", order, iidRefPercent: 20);
}
private static TestDataSet CreateLargeTestData(bool resetId = true)
@ -109,7 +109,7 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
return new TestDataSet("Large (5x5x5x10)", order, iidRefPercent: 10);
return new TestDataSet("Large (5x5x5x10)", order, iidRefPercent: 20);
}
private static TestDataSet CreateRepeatedStringsTestData(bool resetId = true)
@ -137,15 +137,23 @@ public static class BenchmarkTestDataProvider
sharedUser: sharedUser,
sharedPreferences: sharedPreferences);
// Repeated string fields — ProductName on items + PalletCode on pallets. Both are common
// across the hierarchy, exercising string-interning deduplication on the Default preset
// (which has UseStringInterning = All). Targeting ~20% repeated-string share overall.
foreach (var item in order.Items)
{
item.Status = TestStatus.Processing;
item.ProductName = "CommonProductName_RepeatedForTesting";
foreach (var pallet in item.Pallets)
{
pallet.PalletCode = "CommonPalletCode_RepeatedForTesting";
}
}
ClearDeepLevelRefs(order);
return new TestDataSet("Repeated Strings (10 items)", order, iidRefPercent: 10);
return new TestDataSet("Repeated Strings (10 items)", order, iidRefPercent: 20);
}
private static TestDataSet CreateDeepNestedTestData(bool resetId = true)
@ -177,17 +185,21 @@ public static class BenchmarkTestDataProvider
ClearDeepLevelRefs(order);
return new TestDataSet("Deep Nested (2x4x4x8)", order, iidRefPercent: 10);
return new TestDataSet("Deep Nested (2x4x4x8)", order, iidRefPercent: 20);
}
private static void ClearDeepLevelRefs(TestOrder order)
{
// Keep shared IId refs at the pallet level (Tag + Inspector) — these contribute the bulk of
// the ~20% IId-ref share that the test data targets. Only Category is cleared at this level
// (one-of-three clears keep the share moderate). The deeper measurement / point levels are
// cleared entirely so deep-tree ref noise does not skew the share upward beyond ~20%.
foreach (var item in order.Items)
{
foreach (var pallet in item.Pallets)
{
pallet.Tag = null;
pallet.Inspector = null;
// pallet.Tag = null; // KEEP for ~20% IId-ref share (was cleared)
// pallet.Inspector = null; // KEEP for ~20% IId-ref share (was cleared)
pallet.Category = null;
foreach (var measurement in pallet.Measurements)

View File

@ -55,7 +55,7 @@ public enum TestUserRole
/// Implements IId&lt;int&gt; for semantic $id/$ref serialization.
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedTag : IId<int>
{
@ -80,7 +80,7 @@ public partial class SharedTag : IId<int>
/// Shared category - for hierarchical cross-reference testing.
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedCategory : IId<int>
{
@ -106,7 +106,7 @@ public partial class SharedCategory : IId<int>
/// Shared user reference - appears in many places to test $ref deduplication.
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class SharedUser : IId<int>
{
@ -136,7 +136,7 @@ public partial class SharedUser : IId<int>
/// User preferences - non-IId nested object
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class UserPreferences
{
@ -162,7 +162,7 @@ public partial class UserPreferences
/// Does NOT implement IId, so uses standard Newtonsoft reference tracking.
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class MetadataInfo
{
@ -190,7 +190,7 @@ public partial class MetadataInfo
/// Level 1: Main order - root of the hierarchy
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestOrder : IId<int>
{
@ -249,7 +249,7 @@ public partial class TestOrder : IId<int>
/// <summary>
/// Level 1: Main order - root of the hierarchy
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public partial class TestOrder_Circ_Ref : IId<int>
{
public int Id { get; set; }
@ -284,7 +284,7 @@ public partial class TestOrder_Circ_Ref : IId<int>
/// Level 2: Order item with pallets
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestOrderItem : IId<int>
{
@ -323,7 +323,7 @@ public partial class TestOrderItem : IId<int>
/// <summary>
/// Level 2: Order item with pallets
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public partial class TestOrderItem_Circ_Ref : IId<int>
{
public int Id { get; set; }
@ -347,7 +347,7 @@ public partial class TestOrderItem_Circ_Ref : IId<int>
/// Level 3: Pallet containing measurements
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestPallet : IId<int>
{
@ -390,7 +390,7 @@ public partial class TestPallet : IId<int>
/// Level 4: Measurement with multiple points
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestMeasurement : IId<int>
{
@ -425,7 +425,7 @@ public partial class TestMeasurement : IId<int>
/// Level 5: Deepest level - measurement point
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
[MessagePackObject]
public partial class TestMeasurementPoint : IId<int>
{
@ -459,7 +459,7 @@ public partial class TestMeasurementPoint : IId<int>
/// <summary>
/// Order with Guid Id - for testing Guid-based IId
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class TestGuidOrder : IId<Guid>
{
public Guid Id { get; set; }
@ -471,7 +471,7 @@ public class TestGuidOrder : IId<Guid>
/// <summary>
/// Item with Guid Id
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class TestGuidItem : IId<Guid>
{
public Guid Id { get; set; }
@ -487,7 +487,7 @@ public class TestGuidItem : IId<Guid>
/// Simulates NopCommerce GenericAttribute - stores key-value pairs where DateTime values
/// are stored as strings in the database.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class TestGenericAttribute
{
public int Id { get; set; }
@ -499,7 +499,7 @@ public class TestGenericAttribute
/// DTO with GenericAttributes collection - simulates OrderDto with string-stored DateTime values.
/// This reproduces the production bug where Binary serialization was thought to corrupt DateTime strings.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class TestDtoWithGenericAttributes : IId<int>
{
public int Id { get; set; }
@ -510,7 +510,7 @@ public class TestDtoWithGenericAttributes : IId<int>
/// <summary>
/// Order with nullable collections for null vs empty testing
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class TestOrderWithNullableCollections
{
public int Id { get; set; }
@ -523,7 +523,7 @@ public class TestOrderWithNullableCollections
/// Class with all primitive types for WASM/serialization testing
/// </summary>
[MemoryPackable]
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public partial class PrimitiveTestClass
{
public int IntValue { get; set; }
@ -546,7 +546,7 @@ public partial class PrimitiveTestClass
/// Class with extended primitive types for full serializer coverage.
/// Includes DateTimeOffset, TimeSpan, Dictionary, null properties.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ExtendedPrimitiveTestClass
{
public int Id { get; set; }
@ -576,7 +576,7 @@ public class ExtendedPrimitiveTestClass
/// <summary>
/// Class with array of objects containing null items for WriteNull coverage
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ObjectWithNullItems
{
public int Id { get; set; }
@ -591,7 +591,7 @@ public class ObjectWithNullItems
/// "Server-side" DTO with extra properties that the "client" doesn't know about.
/// Used to test SkipValue functionality when deserializing unknown properties.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ServerCustomerDto : IId<int>
{
public int Id { get; set; }
@ -624,7 +624,7 @@ public class ServerCustomerDto : IId<int>
/// the deserializer must skip unknown properties correctly
/// while still maintaining string intern table consistency.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ClientCustomerDto : IId<int>
{
public int Id { get; set; }
@ -638,7 +638,7 @@ public class ClientCustomerDto : IId<int>
/// Server DTO with nested objects that client doesn't know about.
/// Tests skipping complex nested structures.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ServerOrderWithExtras : IId<int>
{
public int Id { get; set; }
@ -659,7 +659,7 @@ public class ServerOrderWithExtras : IId<int>
/// <summary>
/// Client version of the order - doesn't have Customer/RelatedCustomers properties.
/// </summary>
[AcBinarySerializable(false)]
[AcBinarySerializable(false, true, false, false)]
public class ClientOrderSimple : IId<int>
{
public int Id { get; set; }

View File

@ -301,6 +301,7 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
ArrayPool<byte>.Shared.Return(_ownedBuffer);
_ownedBuffer = null;
}
_hasOwnedBuffer = false;
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,140 @@
# AsyncPipe — TODO
Streaming I/O layer for the binary serializer. Open / resolved issues this work targets are in
[`BINARY_ASYNCPIPE_ISSUES.md`](BINARY_ASYNCPIPE_ISSUES.md).
## Priority legend
- **P0** blocker · **P1** important · **P2** nice-to-have · **P3** idea
## ACCORE-BIN-T-D6H4: Create `AsyncPipeReaderInput` class (Step 1 of ADR-0003)
**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 1
Add new `sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs`. Self-contained sliding-window buffer (`byte[]` + `_writePos` + `_readPos` + `_completed` + `ManualResetEventSlim`) with reset-to-0 cycling preserved verbatim from today's `SegmentBufferReader`. Producer API: `Feed(ReadOnlySpan<byte>)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`.
Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain unchanged in this step — they keep serving the SignalR `AcBinaryHubProtocol.TryParseChunkData` path. Migration to the new class is in Step 6 (`ACCORE-SBP-T-G7T2`).
**Naming rationale:** `AsyncPipeReaderInput` mirrors the existing send-side `AsyncPipeWriterOutput`. The `Async` prefix follows .NET BCL convention for type-level naming (`AsyncEnumerable`, `IAsyncDisposable`, `AsyncLocal<T>`).
**Acceptance:**
- New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race).
- Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression).
## ACCORE-BIN-T-M2K1: Add `AsyncPipeReaderInput.DrainFromAsync` extension (Step 2 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4`
Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file). Pulls from a `System.IO.Pipelines.PipeReader` and feeds the input via repeated `Feed(span)` calls; calls `Complete()` at end-of-stream.
Separate file (not a method on the class) so the core `AsyncPipeReaderInput.cs` does not import `System.IO.Pipelines` in its primary contract — the pull-mode is opt-in at use-sites.
**Acceptance:**
- Extension drains an in-memory `Pipe` end-to-end in a unit test (write some bytes → DrainFromAsync → assert AsyncPipeReaderInput state).
- `Complete()` correctly invoked at end-of-stream (`result.IsCompleted`).
## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003)
**Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1`
The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path.
**Acceptance:**
- Test passes consistently on Windows + Linux CI.
- Measured perf delta documented in test output / commit message.
- Test serves as regression guard for future receive-side changes (no silent perf-cliff regression goes undetected).
## ACCORE-BIN-T-A3T8: Add NamedPipe helpers — `SerializeToNamedPipeAsync` / `DeserializeFromNamedPipeAsync` (Step 4 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 4 · **Depends on:** `ACCORE-BIN-T-V7C9`
Add static extension methods on `AcBinarySerializerOptions` for full NamedPipe IPC lifecycle (one-shot send / receive). New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerNamedPipeExtensions.cs`. Send: `NamedPipeServerStream``PipeWriter.Create(stream)``AsyncPipeWriterOutput`. Receive: `NamedPipeClientStream``PipeReader.Create(stream)``AsyncPipeReaderInput.DrainFromAsync`.
Cross-platform: Windows + Linux (Unix-domain-socket via NamedPipe BCL API). WASM throws `PlatformNotSupportedException` per BCL contract.
**Acceptance:**
- Cross-platform integration test: roundtrip a complex object graph through a NamedPipe; assert structural equality.
- WASM build does not link these helpers (or throws clear PNS at runtime if invoked).
## ACCORE-BIN-T-B5Y6: Add FileStream helpers — `SerializeToFileStreamAsync` / `DeserializeFromFileStreamAsync` (Step 5 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 5 · **Depends on:** `ACCORE-BIN-T-A3T8`
Add static extension methods on `AcBinarySerializerOptions` for streaming file I/O. New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerFileStreamExtensions.cs`. Send: `FileStream.Create(path)``PipeWriter.Create(fileStream)``AsyncPipeWriterOutput`. Receive: `FileStream.OpenRead(path)``PipeReader.Create(fileStream)``AsyncPipeReaderInput.DrainFromAsync`.
**Critical streaming-doctrine invariant:** peak buffer memory bounded by `BufferWriterChunkSize × 2` (~8 KB at default), regardless of file size. **NOT file-size-aware** — do not pre-allocate to file size (would defeat streaming and break zerocopy / zeroalloc).
**Acceptance:**
- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout.
- Full structural equality of round-tripped object.
## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput
**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported)
Reuse a long-lived `AsyncPipeReaderInput` across multiple `Deserialize<T>(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical fresh-instance pattern requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports.
**Possible directions** (none fixed; pre-design):
- **`Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely.
- **New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface.
- **`ResetCompletion()` for framed mode**: orthogonal to the cursor design — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen.
- **Consumer-thread atomic position-reset on message-done**: a `MessageDone()`-style method called from the consumer (typically the deserializer's finally-block) atomically resets BOTH `_writePos` AND `_readPos` to 0 on the calling thread BEFORE the next `Initialize` runs. Trade-off: relies on the calling thread being the consumer; multi-thread feed+deserialize patterns need different coordination.
**Acceptance** (for whichever direction is picked):
- New tests exercise `N` consecutive `Deserialize<T>(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression).
- Existing single-message tests continue to pass.
- Wire format unchanged (consumer-side reader plumbing, not a wire-level change).
- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup**.
## ACCORE-BIN-T-V4Q9: Convenience API for the strictly-sequential single-thread pattern
**Priority:** P3 · **Type:** API · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface), [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported)
A higher-level wrapper for the typical single-thread sequential `Feed → Deserialize → reset` cycle. Possible directions (none committed; pre-design):
- **Delegate-based** API on the input: caller passes a deserializer lambda; the input encapsulates the lifecycle (`try` / `finally` + atomic reset). Stateful overload with `state`-parameter for zero-alloc on hot paths (avoids closure capture).
- **`IDisposable` scope object**: caller acquires a per-message scope; the scope's `Dispose` triggers reset.
- **Method-pair on the input**: explicit `BeginMessage` / `EndMessage` in the consumer's hands.
Trade-offs: delegate has runtime cost but is most ergonomic; scope-object adds one allocation per message but composes naturally with `using`; method-pair is most explicit but most error-prone (forget `EndMessage` → leak / corruption).
Implementation only after the related issues are documented and the multi-pattern catalogue exists (so the convenience API's scope is clear: it covers ONE pattern, not all).
## ACCORE-BIN-T-M7B3: Implicit session-end signal in `Dispose()`
**Priority:** P3 · **Type:** API · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-w6k4`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-w6k4-disposal-trap-idisposabledispose-does-not-guarantee-consumer-wakeup)
Possible directions to consider (none chosen):
- **Embed an idempotent session-end signal in `Dispose()`** before tearing down the wait-primitive and buffers. The plain `using` pattern would then suffice — explicit `Complete()` becomes optional. Trade-off: subtle behavioral shift; consumers relying on the "Dispose without prior Complete" path returning data may need to migrate.
- **Document the contract explicitly** without changing behavior: the public XML doc states the consumer-thread must be quiesced before disposal. Lowest-risk; relies on documentation.
- **Throw on `Dispose` if `Complete` was not called**: stricter contract, surfaces the trap as a clear error rather than `ObjectDisposedException` from the wait-primitive. Breaking change.
Pick a direction before implementing; each has different consumer-migration costs.
## ACCORE-BIN-T-K1F6: Public exposure of the wire-frame fixed overhead
**Priority:** P3 · **Type:** API
The wire-frame header byte-size (currently a private constant on `AsyncPipeWriterOutput`) is needed by consumers calculating transport-buffer sizing — kernel pipe buffer width on `NamedPipeServerStream`, `BufferedStream` wrapper size, etc. Possible directions:
- **Expose a public const** on `AsyncPipeWriterOutput` (e.g. `FrameHeaderSize`).
- **Add a static helper** that returns the chunk-on-wire size given a chunk-data size (or vice versa).
- **Document only**: state the framing overhead in the class XML doc and let consumers hard-code.
Smallest API-surface addition that solves the consumer-side calculation problem. Pick one direction before implementing.
## ACCORE-BIN-T-R8H4: Reset / reopen primitive for a stream after session-end
**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-d8l5`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-d8l5-session-end-is-irreversible-no-reset-reopen-primitive)
Possible directions for a failover / reconnect scenario (none committed):
- **`Reset()` method**: a one-method primitive that flips the internal session-end flag back to off and clears the buffer. Trade-off: thread-safety contract — must match the disposal contract; partially-completed framing-state must be explicitly reset.
- **Helper-factory wrapping the lifecycle**: a `using`-friendly factory that produces a fresh instance per session, hiding the irreversibility behind a higher-level primitive.
- **Document only**: state that fresh-instance allocation is the supported reconnect pattern; reuse is unsupported.
Each has different thread-safety implications and wire-formatting reentrancy requirements; explore before any code.
## ACCORE-BIN-T-S5N2: Pattern catalogue in the public class summary
**Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-j2x7-multi-pattern-documentation-gap-on-the-public-api-surface)
Extend the `AsyncPipeReaderInput` class summary with an explicit pattern catalogue: each supported producer-consumer pattern (strictly-sequential single-thread, multi-thread feed+deserialize, push/event-driven, one-shot) with its recommended API (low-level Feed/Initialize/MessageDone vs. any higher-level convenience wrapper). NuGet consumers can self-select the pattern that matches their threading model.
## ACCORE-BIN-T-G3W8: Transport-buffer alignment best-practice doc section
**Priority:** P3 · **Type:** Documentation · **Related:** [`BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2`](BINARY_ASYNCPIPE_ISSUES.md#accore-bin-i-h4g2-chunk-on-wire-size--chunksize--headersize-caused-page-fragmentation)
A documentation section covering the alignment of the user-space `chunkSize` and the transport-level kernel buffer (general principle, not transport-specific): page-aligned matching avoids fragmentation, mismatched sizes cause two kernel-page transfers per chunk. Reference benchmark archive(s) where empirical numbers exist.

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,7 @@
# AcBinarySerializer — TODO
This page covers planned work for the **binary serializer core** (format, SGen, options, deserialization context, buffer writer). Work specific to the **streaming I/O layer** (`AsyncPipeReaderInput` + `AsyncPipeWriterOutput`, multi-message wire framing, sliding-window buffer, producer-consumer synchronization) is tracked separately in [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md).
## Priority legend
- **P0** blocker · **P1** important · **P2** nice-to-have · **P3** idea
@ -109,64 +111,8 @@ Implement the phase 1 runtime path of source→target projection serialization p
**Sibling rebrand-prep TODOs:** `ACCORE-BIN-T-Z3K8` (IId migration), `ACCORE-BIN-T-N7V1` (JsonIgnore replacement).
## ACCORE-BIN-T-D6H4: Create `AsyncPipeReaderInput` class (Step 1 of ADR-0003)
**Priority:** P1 · **Type:** Refactor · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 1
Add new `sealed class AsyncPipeReaderInput : IBinaryInputBase, IDisposable` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInput.cs`. Self-contained sliding-window buffer (`byte[]` + `_writePos` + `_readPos` + `_completed` + `ManualResetEventSlim`) with reset-to-0 cycling preserved verbatim from today's `SegmentBufferReader`. Producer API: `Feed(ReadOnlySpan<byte>)`, `Complete()`. Consumer API (IBinaryInputBase): `Initialize` / `TryAdvanceSegment` / `Release`.
Existing `SegmentBufferReader.cs` and `SegmentBufferReaderInput.cs` remain unchanged in this step — they keep serving the SignalR `AcBinaryHubProtocol.TryParseChunkData` path. Migration to the new class is in Step 6 (`ACCORE-SBP-T-G7T2`).
**Naming rationale:** `AsyncPipeReaderInput` mirrors the existing send-side `AsyncPipeWriterOutput`. The `Async` prefix follows .NET BCL convention for type-level naming (`AsyncEnumerable`, `IAsyncDisposable`, `AsyncLocal<T>`).
**Acceptance:**
- New class compiles; isolated unit tests cover `Feed` / `TryAdvanceSegment` / `Complete` / `Dispose` contracts (incl. producer-consumer concurrency, missed-signal double-check, grow-buffer handoff race).
- Existing SignalR tests continue to pass on the unchanged `SegmentBufferReader` path (no behavioral regression).
## ACCORE-BIN-T-M2K1: Add `AsyncPipeReaderInput.DrainFromAsync` extension (Step 2 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 2 · **Depends on:** `ACCORE-BIN-T-D6H4`
Add `public static async Task DrainFromAsync(this AsyncPipeReaderInput input, PipeReader reader, CancellationToken ct)` in `AyCode.Core/Serializers/Binaries/AsyncPipeReaderInputExtensions.cs` (NEW file). Pulls from a `System.IO.Pipelines.PipeReader` and feeds the input via repeated `Feed(span)` calls; calls `Complete()` at end-of-stream.
Separate file (not a method on the class) so the core `AsyncPipeReaderInput.cs` does not import `System.IO.Pipelines` in its primary contract — the pull-mode is opt-in at use-sites.
**Acceptance:**
- Extension drains an in-memory `Pipe` end-to-end in a unit test (write some bytes → DrainFromAsync → assert AsyncPipeReaderInput state).
- `Complete()` correctly invoked at end-of-stream (`result.IsCompleted`).
## ACCORE-BIN-T-V7C9: Replace misleading parallel test with real parallel pipeline test (Step 3 of ADR-0003)
**Priority:** P1 · **Type:** Test · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 3 · **Depends on:** `ACCORE-BIN-T-M2K1`
The current `AcBinarySerializerPipeParallelTests.cs` is misleading — it does not actually exercise serializer↔deserializer parallelism (single-threaded in practice). Rewrite to drive a producer thread (serializer) and a consumer thread (deserializer) through an in-memory `Pipe`, with `AsyncPipeReaderInput.DrainFromAsync` on the receive side. Measure ser+deser overlap and verify the ADR-0003 claimed ~1 µs / MB perf delta vs today's struct-based path.
**Acceptance:**
- Test passes consistently on Windows + Linux CI.
- Measured perf delta documented in test output / commit message.
- Test serves as regression guard for future receive-side changes (no silent perf-cliff regression goes undetected).
## ACCORE-BIN-T-A3T8: Add NamedPipe helpers — `SerializeToNamedPipeAsync` / `DeserializeFromNamedPipeAsync` (Step 4 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 4 · **Depends on:** `ACCORE-BIN-T-V7C9`
Add static extension methods on `AcBinarySerializerOptions` for full NamedPipe IPC lifecycle (one-shot send / receive). New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerNamedPipeExtensions.cs`. Send: `NamedPipeServerStream``PipeWriter.Create(stream)``AsyncPipeWriterOutput`. Receive: `NamedPipeClientStream``PipeReader.Create(stream)``AsyncPipeReaderInput.DrainFromAsync`.
Cross-platform: Windows + Linux (Unix-domain-socket via NamedPipe BCL API). WASM throws `PlatformNotSupportedException` per BCL contract.
**Acceptance:**
- Cross-platform integration test: roundtrip a complex object graph through a NamedPipe; assert structural equality.
- WASM build does not link these helpers (or throws clear PNS at runtime if invoked).
## ACCORE-BIN-T-B5Y6: Add FileStream helpers — `SerializeToFileStreamAsync` / `DeserializeFromFileStreamAsync` (Step 5 of ADR-0003)
**Priority:** P1 · **Type:** Feature · **Related ADR:** [`docs/adr/0003-acbinary-streaming-receive-architecture.md`](../../../docs/adr/0003-acbinary-streaming-receive-architecture.md) Step 5 · **Depends on:** `ACCORE-BIN-T-A3T8`
Add static extension methods on `AcBinarySerializerOptions` for streaming file I/O. New file `AyCode.Core/Serializers/Binaries/AcBinarySerializerFileStreamExtensions.cs`. Send: `FileStream.Create(path)``PipeWriter.Create(fileStream)``AsyncPipeWriterOutput`. Receive: `FileStream.OpenRead(path)``PipeReader.Create(fileStream)``AsyncPipeReaderInput.DrainFromAsync`.
**Critical streaming-doctrine invariant:** peak buffer memory bounded by `BufferWriterChunkSize × 2` (~8 KB at default), regardless of file size. **NOT file-size-aware** — do not pre-allocate to file size (would defeat streaming and break zerocopy / zeroalloc).
**Acceptance:**
- Large-file roundtrip test (≥ 100 MB) passes with memory profiler showing peak buffer ≤ 16 KB throughout.
- Full structural equality of round-tripped object.
## ACCORE-BIN-T-K3W7: Rename `BufferWriterChunkSize` to reflect actual semantics
**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API)
**Priority:** P3 · **Type:** Refactor · **Breaking:** Yes (public option API) · **Streaming impact:** see [`BINARY_ASYNCPIPE_TODO.md`](BINARY_ASYNCPIPE_TODO.md) for the streaming-side companion considerations (chunk-on-wire vs internal-buffer semantics)
The property name `BufferWriterChunkSize` is misleading: across the three output paths it does NOT consistently represent a "chunk".
@ -202,21 +148,3 @@ Pick one before touching code. Option 2 is the most correct but adds API surface
- Wire format unchanged. Default values unchanged (65535 / equivalent).
- Migration note in CHANGELOG / release notes since this is a breaking change to `AcBinarySerializerOptions`.
## ACCORE-BIN-T-R5K2: Multi-message reuse for AsyncPipeReaderInput
**Priority:** P3 · **Type:** Feature · **Related:** [`BINARY_ISSUES.md#accore-bin-i-q4t8`](BINARY_ISSUES.md#accore-bin-i-q4t8-asyncpipereaderinput-multi-message-reuse-not-supported) — full Symptom / Root cause / Workarounds documented there; **do not duplicate here**.
Add a "next message" cursor / reset semantics so a long-lived `AsyncPipeReaderInput` can be reused across multiple `Deserialize<T>(input, opts)` calls without setting up a fresh instance per message. Removes the per-message ArrayPool rent + `ManualResetEventSlim` allocation + two `Task.Run` calls that the canonical pattern (`DeserializeFromPipeReaderAsync`) requires today, opening a true zero-alloc-per-message path on long-lived raw IPC transports (NamedPipe, FileStream, NetworkStream).
**Design candidates** (pick one — prototype first, measure the small-message zero-alloc claim before committing):
- **A. `Initialize` emits `_readPos` as starting position** (instead of always 0), and the sliding-window reset becomes "anytime `_readPos > 0` after a `Deserialize` completes, reset both `_writePos` and `_readPos` to 0". Smallest API change, no public surface added. Caveat: requires the deserializer to call `TryAdvanceSegment` at least once during message read so `_readPos` reflects the consumed boundary — small fully-buffered messages currently skip it entirely.
- **B. New `SetReadCursor(int position)` / `AdvanceReadTo(int position)` method**: caller (deserializer or wrapper) reports the consumed offset after each `Deserialize`. Sliding-window reset triggers explicitly. Cleaner separation of concerns (consumer knows where it stopped), but adds a public API surface.
- **C. `ResetCompletion()` for framed mode**: orthogonal to A/B — needed for framed multi-message reuse where the `[202]` CHUNK_END marker currently makes `_completed = true` irreversible. Combine with whichever cursor design is chosen.
**Acceptance:**
- New tests exercise `N` consecutive `Deserialize<T>(sharedInput, opts)` calls on the same instance, both raw and framed modes, with payload sizes both above and below the initial buffer capacity. All `N` results match their respective inputs (no buffer-position aliasing, no message-#1-duplicate-on-#2 regression).
- Existing `DeserializeFromPipeReaderAsync` unit tests continue to pass (single-message path unchanged).
- Wire format unchanged (this is consumer-side reader plumbing, not a wire-level change).
- Allocation profile of `N` consecutive reads on the shared input: **0 bytes per call after warmup** (ArrayPool rent reused across calls, no MRES per call, no `Task.Run` per call). The deserialized object graph allocations stay (those are user-visible).

File diff suppressed because one or more lines are too long