AyCode.Core/AyCode.Core.Tests/Serialization/AcBinarySerializerPipeParal...

730 lines
28 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels;
using System.IO;
using System.IO.Pipelines;
using static AyCode.Core.Tests.TestModels.AcSerializerModels;
namespace AyCode.Core.Tests.Serialization;
/// <summary>
/// Unit tests for <see cref="AsyncPipeReaderInput"/> (Step 1, ACCORE-BIN-T-D6H4) and the
/// <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> extension (Step 2, ACCORE-BIN-T-M2K1),
/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect
/// sanity pinning (Step 4).
///
/// <para>Tests run with <see cref="AsyncPipeReaderInput"/>'s default <c>multiMessage = true</c> —
/// <see cref="AsyncPipeReaderInput.Feed"/> expects the AsyncSegment chunked wire format
/// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c> CHUNK_START prefix, and
/// signals end-of-stream on <c>[202]</c> CHUNK_END. The <see cref="WrapInChunkFrame"/> helper
/// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames.</para>
///
/// <para>Wire format identical to <see cref="AsyncPipeWriterOutput"/> framed output and to
/// SignalR's <c>AcBinaryHubProtocol.TryParseChunkData</c> input — unified across all transports
/// per ADR-0003 §9.</para>
/// </summary>
[TestClass]
public class AcBinarySerializerPipeParallelTests
{
// ====================================================================
// Step 1 — AsyncPipeReaderInput contract (ACCORE-BIN-T-D6H4)
// ====================================================================
[TestMethod]
public void Feed_EmptyData_NoOp()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(ReadOnlySpan<byte>.Empty);
input.Complete();
// No data → TryAdvanceSegment returns false immediately
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(0, bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_AppendsBytes_AccessibleViaTryAdvanceSegment()
{
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
input.Feed(WrapInChunkFrame(data)); // [201][UINT16=8][1..8]
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public void Initialize_BeforeFeed_ReturnsEmptyBuffer()
{
using var input = new AsyncPipeReaderInput(64);
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.IsNotNull(buffer);
Assert.AreEqual(0, position);
Assert.AreEqual(0, bufferLength);
}
[TestMethod]
public void Initialize_AfterFeed_ReturnsAvailableData()
{
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30 };
input.Feed(WrapInChunkFrame(data));
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(0, position);
Assert.AreEqual(3, bufferLength);
Assert.AreEqual((byte)10, buffer[0]);
Assert.AreEqual((byte)20, buffer[1]);
Assert.AreEqual((byte)30, buffer[2]);
}
[TestMethod]
public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete();
// Simulate consumer that has read all 3 bytes
input.Initialize(out var buffer, out var position, out var bufferLength);
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed(WrapInChunkFrame([4, 5, 6]));
input.Complete();
// Simulate consumer that has read 3 of 6 bytes — advance should expose the rest
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(6, bufferLength);
position = 3;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsTrue(hasMore);
Assert.AreEqual(3, position);
Assert.AreEqual(6, bufferLength);
Assert.AreEqual((byte)4, buffer[3]);
Assert.AreEqual((byte)5, buffer[4]);
Assert.AreEqual((byte)6, buffer[5]);
}
[TestMethod]
public void Grow_PastInitialCapacity_BytesPreservedAcrossGrows()
{
// Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow
using var input = new AsyncPipeReaderInput(16);
var data = new byte[64];
for (var i = 0; i < data.Length; i++) data[i] = (byte)i;
// Feed in chunks that overflow the initial buffer (each wrapped in a chunk frame)
input.Feed(WrapInChunkFrame(data, 0, 16));
input.Feed(WrapInChunkFrame(data, 16, 16)); // grow #1
input.Feed(WrapInChunkFrame(data, 32, 32)); // grow #2
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public async Task ProducerConsumer_Concurrency_AllBytesDeliveredInOrder()
{
const int totalBytes = 8192;
const int chunkSize = 17; // intentional: not a power of 2, exercises partial fills
using var input = new AsyncPipeReaderInput(64);
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var produceTask = Task.Run(() =>
{
try
{
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
input.Feed(WrapInChunkFrame(expected, offset, take));
offset += take;
}
}
finally
{
input.Complete();
}
});
await Task.WhenAll(consumeTask, produceTask);
var actual = consumeTask.Result;
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public async Task ProducerConsumer_SlidingWindowCycle_ManyResetsHandledCorrectly()
{
// Small initial buffer + slow producer drives many reset-to-0 cycles.
const int totalBytes = 32 * 1024;
const int chunkSize = 7;
using var input = new AsyncPipeReaderInput(32);
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var produceTask = Task.Run(async () =>
{
try
{
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
input.Feed(WrapInChunkFrame(expected, offset, take));
offset += take;
if ((offset & 0x7F) == 0) await Task.Yield();
}
}
finally
{
input.Complete();
}
});
await Task.WhenAll(consumeTask, produceTask);
var actual = consumeTask.Result;
Assert.AreEqual(expected.Length, actual.Length);
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public void Dispose_DoesNotThrow()
{
var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete();
input.Dispose();
}
[TestMethod]
public void Constructor_InvalidCapacity_ThrowsArgumentOutOfRange()
{
_ = Assert.ThrowsExactly<ArgumentOutOfRangeException>(() => new AsyncPipeReaderInput(0));
_ = Assert.ThrowsExactly<ArgumentOutOfRangeException>(() => new AsyncPipeReaderInput(-1));
}
[TestMethod]
public void Feed_PartialFrameAcrossCalls_ParsedCorrectly()
{
// Verifies the framing state machine survives partial frame headers / sizes / data
// split across multiple Feed calls.
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30, 40, 50 };
var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50]
// Feed byte-by-byte to stress the state machine
for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1));
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public void Feed_ChunkEndMarker_AutoResetsForNextMessage()
{
// [202] CHUNK_END is end-of-MESSAGE on the WIRE — NOT end-of-session and NOT, by itself,
// a buffer-cursor recycle. On [202], the framing-state machine resets to AwaitingHeader so
// the next [201] header is parsed correctly; buffer-cursor recycling is armed separately by
// the consumer via MessageDone() (typically from the AcBinaryDeserializer.Deserialize<T>(
// AsyncPipeReaderInput, opts) finally block, AFTER the deserialiser has finished reading
// the structurally-complete graph). See BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix.
// Session end is signalled separately by an external Complete() / stream-EOF.
using var input = new AsyncPipeReaderInput(64);
// Message 1
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed([202]); // CHUNK_END — framing reset only (no buffer-cursor recycle, no completion)
// First message is consumable
input.Initialize(out var buf1, out var pos1, out var bufLen1);
Assert.AreEqual(3, bufLen1);
Assert.AreEqual(1, buf1[0]);
Assert.AreEqual(2, buf1[1]);
Assert.AreEqual(3, buf1[2]);
// Simulate the AcBinaryDeserializer.Deserialize<T>(input, opts) finally block: the consumer
// calls MessageDone() AFTER it has finished reading the graph. This arms the
// _readPos = -1 sentinel; the next AppendToBuffer for message 2 sees rp < 0 and recycles
// the buffer to 0 (sliding-window cycling).
input.MessageDone();
// Message 2 — same long-lived input, just keeps going
input.Feed(WrapInChunkFrame([10, 20, 30, 40]));
input.Feed([202]);
// Re-initialize for the next deserializer call — the buffer was recycled to 0 by the
// sliding-window cycling triggered when AppendToBuffer saw _readPos == -1 (sentinel
// armed by the MessageDone() call above).
input.Initialize(out var buf2, out var pos2, out var bufLen2);
Assert.AreEqual(4, bufLen2);
Assert.AreEqual(10, buf2[0]);
Assert.AreEqual(20, buf2[1]);
Assert.AreEqual(30, buf2[2]);
Assert.AreEqual(40, buf2[3]);
// Now signal end-of-session explicitly
input.Complete();
// After Complete, TryAdvanceSegment returns false on empty — session truly ended
var pos3 = bufLen2;
var bufLen3 = bufLen2;
var buf3 = buf2;
var hasMore = input.TryAdvanceSegment(ref buf3, ref pos3, ref bufLen3, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_ExternalComplete_SignalsEndOfSession()
{
// Explicit Complete() (or stream-EOF in the DrainFromAsync path) is the session-end signal —
// distinct from per-message [202] markers which only auto-reset for the next message.
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete(); // external session-end
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(3, bufferLength);
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_UnexpectedMarker_ThrowsInvalidDataException()
{
using var input = new AsyncPipeReaderInput(64);
// Byte 0x42 is not 200/201/202 — should throw
_ = Assert.ThrowsExactly<InvalidDataException>(() => input.Feed([0x42]));
}
// ====================================================================
// Step 2 — DrainFromAsync extension (ACCORE-BIN-T-M2K1)
// ====================================================================
[TestMethod]
public async Task DrainFromAsync_NullInput_ThrowsArgumentNullException()
{
var pipe = new Pipe();
await pipe.Writer.CompleteAsync();
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader));
}
[TestMethod]
public async Task DrainFromAsync_NullReader_ThrowsArgumentNullException()
{
using var input = new AsyncPipeReaderInput(64);
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await input.DrainFromAsync(null!));
}
[TestMethod]
public async Task DrainFromAsync_PipeWithData_FeedsAllBytes()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
await pipe.Writer.WriteAsync(WrapInChunkFrame(data));
await pipe.Writer.CompleteAsync();
await input.DrainFromAsync(pipe.Reader);
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public async Task DrainFromAsync_EmptyPipeCompleted_CallsCompleteOnInput()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
await pipe.Writer.CompleteAsync();
await input.DrainFromAsync(pipe.Reader);
// After drain, AsyncPipeReaderInput should be completed → TryAdvanceSegment returns false on empty
input.Initialize(out var buffer, out var position, out var bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public async Task DrainFromAsync_ConcurrentWriteDrainConsume_OrderPreserved()
{
// 3-thread pipeline: writer → pipe → drainer → input → consumer
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
const int totalBytes = 4096;
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var drainTask = input.DrainFromAsync(pipe.Reader);
var writeTask = Task.Run(async () =>
{
try
{
const int chunkSize = 31;
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
await pipe.Writer.WriteAsync(WrapInChunkFrame(expected, offset, take));
offset += take;
}
}
finally
{
await pipe.Writer.CompleteAsync();
}
});
await Task.WhenAll(consumeTask, drainTask, writeTask);
var actual = consumeTask.Result;
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public async Task DrainFromAsync_Cancellation_PropagatesAndCallsComplete()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
using var cts = new CancellationTokenSource();
var drainTask = input.DrainFromAsync(pipe.Reader, cts.Token);
cts.Cancel();
await Assert.ThrowsExactlyAsync<OperationCanceledException>(async () => await drainTask);
// Verify Complete was called in the finally block
input.Initialize(out var buffer, out var position, out var bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
// ====================================================================
// Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9)
//
// True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via
// AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader
// via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed).
// All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is
// still writing the tail of the message while the deserializer has already consumed the
// head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput).
//
// BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk
// boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state
// machine. Wire is uniform AsyncSegment chunked format (per ADR-0003 §9).
// ====================================================================
[TestMethod]
public async Task RealParallelPipeline_SerializeViaPipeWriter_DeserializeViaPipeReader_PayloadEquals()
{
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
var original = CreatePayload(50);
var pipe = new Pipe();
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestParentWithDateTimeItemCollection>(input, opts));
var drainTask = input.DrainFromAsync(pipe.Reader);
var serTask = Task.Run(async () =>
{
try
{
// SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire.
// AsyncPipeReaderInput.Feed strips framing internally on the receive side
// (default multiMessage = true).
AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts);
}
finally
{
await pipe.Writer.CompleteAsync();
}
});
await Task.WhenAll(serTask, drainTask, deserTask);
var result = deserTask.Result;
Assert.IsNotNull(result);
AssertPayloadEquals(original, result);
}
[TestMethod]
public async Task RealParallelPipeline_LargeScalePayload_ChunkSize4096_StructuralEquality()
{
// Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points
// = ~3700 deeply-nested objects with shared references. Serialized size ~few hundred KB →
// many chunks at chunkSize=4096 → real backpressure-driven streaming (PipeWriter pauseThreshold
// ~64KB, bytes flow incrementally as drainer + deserializer task pulls them out).
// This is the most-realistic real-parallel-pipeline test: in-memory Pipe + 3-task overlap +
// production-scale payload + production-scale chunk size.
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 };
var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100);
var pipe = new Pipe();
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestOrder_All_True>(input, opts));
var drainTask = input.DrainFromAsync(pipe.Reader);
var serTask = Task.Run(async () =>
{
try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); }
finally { await pipe.Writer.CompleteAsync(); }
});
await Task.WhenAll(serTask, drainTask, deserTask);
var result = deserTask.Result;
Assert.IsNotNull(result);
Assert.AreEqual(original.Id, result.Id);
Assert.AreEqual(original.OrderNumber, result.OrderNumber);
Assert.AreEqual(original.Status, result.Status);
Assert.AreEqual(original.TotalAmount, result.TotalAmount);
var origCounts = CountTestOrderHierarchy(original);
var resultCounts = CountTestOrderHierarchy(result);
Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch");
Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch");
Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch");
Assert.AreEqual(origCounts.points, resultCounts.points, "Points count mismatch");
}
private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder_All_True order)
{
var items = order.Items.Count;
int pallets = 0, measurements = 0, points = 0;
foreach (var item in order.Items)
{
pallets += item.Pallets.Count;
foreach (var p in item.Pallets)
{
measurements += p.Measurements.Count;
points += p.Measurements.Sum(m => m.Points.Count);
}
}
return (items, pallets, measurements, points);
}
// ====================================================================
// Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning
// ====================================================================
//
// Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a
// different runtime type than new Pipe().Writer.GetType(). This is what makes
// AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential
// (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal
// BCL type names directly. If a future .NET unifies the two writer impls or renames the
// internal type in a way that breaks the detect, these tests fail before prod.
[TestMethod]
public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes()
{
var pipeBased = new Pipe().Writer.GetType();
var streamBased = PipeWriter.Create(Stream.Null).GetType();
// Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire
// would either always-true or always-false, both of which break correctness.
Assert.AreNotEqual(pipeBased, streamBased,
$"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " +
$"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}");
// Living documentation — typenames printed for debugging on future .NET upgrades.
Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}");
Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}");
}
[TestMethod]
public void StreamPipeWriterTypeField_MatchesFactoryResult()
{
// The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType()
// at class-load time. A second call to the factory MUST yield the same Type instance —
// otherwise the cache is stale and the runtime detect mis-classifies all stream writers.
var freshType = PipeWriter.Create(Stream.Null).GetType();
Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType,
"Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " +
"behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput).");
}
[TestMethod]
public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse()
{
// The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else
// sequential mode would be wrongly selected and we'd lose the parallelism feature.
var pipeBasedType = new Pipe().Writer.GetType();
Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType),
$"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " +
$"(or subclass) — runtime detect would mis-classify it as sequential.");
}
[TestMethod]
public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue()
{
// PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) —
// even for unusual stream types (file, memory, null).
Type[] writerTypes =
[
PipeWriter.Create(Stream.Null).GetType(),
PipeWriter.Create(new MemoryStream()).GetType(),
];
foreach (var t in writerTypes)
{
Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t),
$"PipeWriter.Create(<stream>) returned typename={t.FullName} which is not " +
$"assignable to StreamPipeWriterType — the BCL changed its factory contract.");
}
}
// ====================================================================
// Test helpers
// ====================================================================
/// <summary>
/// Wraps a raw payload in a single AsyncSegment chunk frame: <c>[201][UINT16 LE size][data]</c>.
/// Matches the wire format produced by <see cref="AsyncPipeWriterOutput"/> per chunk.
/// </summary>
private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length);
private static byte[] WrapInChunkFrame(byte[] data, int offset, int length)
{
var result = new byte[3 + length];
result[0] = 201; // CHUNK_DATA marker
result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte
result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte
Array.Copy(data, offset, result, 3, length);
return result;
}
/// <summary>
/// Drains the input fully via the IBinaryInputBase contract, returning all consumed bytes.
/// Mimics the consumer pattern that <c>DeserializeSequence&lt;TInput&gt;</c> uses internally.
/// </summary>
private static byte[] ConsumeAll(AsyncPipeReaderInput input)
{
var consumed = new List<byte>();
input.Initialize(out var buffer, out var position, out var bufferLength);
while (true)
{
while (position < bufferLength)
{
consumed.Add(buffer[position]);
position++;
}
if (!input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1))
break;
}
input.Release();
return consumed.ToArray();
}
private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount)
{
var now = DateTime.UtcNow;
var items = new List<TestEntityWithDateTimeAndInt>(itemCount);
for (var i = 0; i < itemCount; i++)
{
items.Add(new TestEntityWithDateTimeAndInt
{
Id = i + 1,
IntValue = i * 3,
Created = now.AddMinutes(-i),
Modified = now.AddMinutes(i),
StatusCode = i % 4,
Name = $"item-{i}"
});
}
return new TestParentWithDateTimeItemCollection
{
Id = 11,
Name = "real-parallel-pipeline",
Created = now,
Items = items
};
}
private static void AssertPayloadEquals(TestParentWithDateTimeItemCollection expected, TestParentWithDateTimeItemCollection actual)
{
Assert.AreEqual(expected.Id, actual.Id);
Assert.AreEqual(expected.Name, actual.Name);
Assert.AreEqual(expected.Created, actual.Created);
Assert.IsNotNull(expected.Items);
Assert.IsNotNull(actual.Items);
Assert.AreEqual(expected.Items.Count, actual.Items.Count);
for (var i = 0; i < expected.Items.Count; i++)
{
var e = expected.Items[i];
var a = actual.Items[i];
Assert.AreEqual(e.Id, a.Id);
Assert.AreEqual(e.IntValue, a.IntValue);
Assert.AreEqual(e.Created, a.Created);
Assert.AreEqual(e.Modified, a.Modified);
Assert.AreEqual(e.StatusCode, a.StatusCode);
Assert.AreEqual(e.Name, a.Name);
}
}
}