730 lines
28 KiB
C#
730 lines
28 KiB
C#
using AyCode.Core.Serializers.Binaries;
|
||
using AyCode.Core.Tests.TestModels;
|
||
using System.IO;
|
||
using System.IO.Pipelines;
|
||
using static AyCode.Core.Tests.TestModels.AcSerializerModels;
|
||
|
||
namespace AyCode.Core.Tests.Serialization;
|
||
|
||
/// <summary>
|
||
/// Unit tests for <see cref="AsyncPipeReaderInput"/> (Step 1, ACCORE-BIN-T-D6H4) and the
|
||
/// <see cref="AsyncPipeReaderInputExtensions.DrainFromAsync"/> extension (Step 2, ACCORE-BIN-T-M2K1),
|
||
/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect
|
||
/// sanity pinning (Step 4).
|
||
///
|
||
/// <para>Tests run with <see cref="AsyncPipeReaderInput"/>'s default <c>multiMessage = true</c> —
|
||
/// <see cref="AsyncPipeReaderInput.Feed"/> expects the AsyncSegment chunked wire format
|
||
/// <c>[201][UINT16 LE size][data]</c> per chunk, tolerates <c>[200]</c> CHUNK_START prefix, and
|
||
/// signals end-of-stream on <c>[202]</c> CHUNK_END. The <see cref="WrapInChunkFrame"/> helper
|
||
/// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames.</para>
|
||
///
|
||
/// <para>Wire format identical to <see cref="AsyncPipeWriterOutput"/> framed output and to
|
||
/// SignalR's <c>AcBinaryHubProtocol.TryParseChunkData</c> input — unified across all transports
|
||
/// per ADR-0003 §9.</para>
|
||
/// </summary>
|
||
[TestClass]
|
||
public class AcBinarySerializerPipeParallelTests
|
||
{
|
||
// ====================================================================
|
||
// Step 1 — AsyncPipeReaderInput contract (ACCORE-BIN-T-D6H4)
|
||
// ====================================================================
|
||
|
||
[TestMethod]
|
||
public void Feed_EmptyData_NoOp()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
input.Feed(ReadOnlySpan<byte>.Empty);
|
||
input.Complete();
|
||
|
||
// No data → TryAdvanceSegment returns false immediately
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
Assert.AreEqual(0, bufferLength);
|
||
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Feed_AppendsBytes_AccessibleViaTryAdvanceSegment()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
|
||
|
||
input.Feed(WrapInChunkFrame(data)); // [201][UINT16=8][1..8]
|
||
input.Complete();
|
||
|
||
var consumed = ConsumeAll(input);
|
||
CollectionAssert.AreEqual(data, consumed);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Initialize_BeforeFeed_ReturnsEmptyBuffer()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
|
||
Assert.IsNotNull(buffer);
|
||
Assert.AreEqual(0, position);
|
||
Assert.AreEqual(0, bufferLength);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Initialize_AfterFeed_ReturnsAvailableData()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
var data = new byte[] { 10, 20, 30 };
|
||
input.Feed(WrapInChunkFrame(data));
|
||
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
|
||
Assert.AreEqual(0, position);
|
||
Assert.AreEqual(3, bufferLength);
|
||
Assert.AreEqual((byte)10, buffer[0]);
|
||
Assert.AreEqual((byte)20, buffer[1]);
|
||
Assert.AreEqual((byte)30, buffer[2]);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
input.Feed(WrapInChunkFrame([1, 2, 3]));
|
||
input.Complete();
|
||
|
||
// Simulate consumer that has read all 3 bytes
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
position = bufferLength;
|
||
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
input.Feed(WrapInChunkFrame([1, 2, 3]));
|
||
input.Feed(WrapInChunkFrame([4, 5, 6]));
|
||
input.Complete();
|
||
|
||
// Simulate consumer that has read 3 of 6 bytes — advance should expose the rest
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
Assert.AreEqual(6, bufferLength);
|
||
|
||
position = 3;
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
|
||
Assert.IsTrue(hasMore);
|
||
Assert.AreEqual(3, position);
|
||
Assert.AreEqual(6, bufferLength);
|
||
Assert.AreEqual((byte)4, buffer[3]);
|
||
Assert.AreEqual((byte)5, buffer[4]);
|
||
Assert.AreEqual((byte)6, buffer[5]);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Grow_PastInitialCapacity_BytesPreservedAcrossGrows()
|
||
{
|
||
// Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow
|
||
using var input = new AsyncPipeReaderInput(16);
|
||
|
||
var data = new byte[64];
|
||
for (var i = 0; i < data.Length; i++) data[i] = (byte)i;
|
||
|
||
// Feed in chunks that overflow the initial buffer (each wrapped in a chunk frame)
|
||
input.Feed(WrapInChunkFrame(data, 0, 16));
|
||
input.Feed(WrapInChunkFrame(data, 16, 16)); // grow #1
|
||
input.Feed(WrapInChunkFrame(data, 32, 32)); // grow #2
|
||
input.Complete();
|
||
|
||
var consumed = ConsumeAll(input);
|
||
CollectionAssert.AreEqual(data, consumed);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task ProducerConsumer_Concurrency_AllBytesDeliveredInOrder()
|
||
{
|
||
const int totalBytes = 8192;
|
||
const int chunkSize = 17; // intentional: not a power of 2, exercises partial fills
|
||
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var expected = new byte[totalBytes];
|
||
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
|
||
|
||
var consumeTask = Task.Run(() => ConsumeAll(input));
|
||
|
||
var produceTask = Task.Run(() =>
|
||
{
|
||
try
|
||
{
|
||
var offset = 0;
|
||
while (offset < expected.Length)
|
||
{
|
||
var take = Math.Min(chunkSize, expected.Length - offset);
|
||
|
||
input.Feed(WrapInChunkFrame(expected, offset, take));
|
||
offset += take;
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
input.Complete();
|
||
}
|
||
});
|
||
|
||
await Task.WhenAll(consumeTask, produceTask);
|
||
|
||
var actual = consumeTask.Result;
|
||
CollectionAssert.AreEqual(expected, actual);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task ProducerConsumer_SlidingWindowCycle_ManyResetsHandledCorrectly()
|
||
{
|
||
// Small initial buffer + slow producer drives many reset-to-0 cycles.
|
||
const int totalBytes = 32 * 1024;
|
||
const int chunkSize = 7;
|
||
|
||
using var input = new AsyncPipeReaderInput(32);
|
||
var expected = new byte[totalBytes];
|
||
|
||
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
|
||
|
||
var consumeTask = Task.Run(() => ConsumeAll(input));
|
||
|
||
var produceTask = Task.Run(async () =>
|
||
{
|
||
try
|
||
{
|
||
var offset = 0;
|
||
while (offset < expected.Length)
|
||
{
|
||
var take = Math.Min(chunkSize, expected.Length - offset);
|
||
input.Feed(WrapInChunkFrame(expected, offset, take));
|
||
offset += take;
|
||
if ((offset & 0x7F) == 0) await Task.Yield();
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
input.Complete();
|
||
}
|
||
});
|
||
|
||
await Task.WhenAll(consumeTask, produceTask);
|
||
|
||
var actual = consumeTask.Result;
|
||
|
||
Assert.AreEqual(expected.Length, actual.Length);
|
||
CollectionAssert.AreEqual(expected, actual);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Dispose_DoesNotThrow()
|
||
{
|
||
var input = new AsyncPipeReaderInput(64);
|
||
input.Feed(WrapInChunkFrame([1, 2, 3]));
|
||
input.Complete();
|
||
|
||
input.Dispose();
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Constructor_InvalidCapacity_ThrowsArgumentOutOfRange()
|
||
{
|
||
_ = Assert.ThrowsExactly<ArgumentOutOfRangeException>(() => new AsyncPipeReaderInput(0));
|
||
_ = Assert.ThrowsExactly<ArgumentOutOfRangeException>(() => new AsyncPipeReaderInput(-1));
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Feed_PartialFrameAcrossCalls_ParsedCorrectly()
|
||
{
|
||
// Verifies the framing state machine survives partial frame headers / sizes / data
|
||
// split across multiple Feed calls.
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
var data = new byte[] { 10, 20, 30, 40, 50 };
|
||
var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50]
|
||
|
||
// Feed byte-by-byte to stress the state machine
|
||
for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1));
|
||
input.Complete();
|
||
|
||
var consumed = ConsumeAll(input);
|
||
CollectionAssert.AreEqual(data, consumed);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Feed_ChunkEndMarker_AutoResetsForNextMessage()
|
||
{
|
||
// [202] CHUNK_END is end-of-MESSAGE on the WIRE — NOT end-of-session and NOT, by itself,
|
||
// a buffer-cursor recycle. On [202], the framing-state machine resets to AwaitingHeader so
|
||
// the next [201] header is parsed correctly; buffer-cursor recycling is armed separately by
|
||
// the consumer via MessageDone() (typically from the AcBinaryDeserializer.Deserialize<T>(
|
||
// AsyncPipeReaderInput, opts) finally block, AFTER the deserialiser has finished reading
|
||
// the structurally-complete graph). See BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix.
|
||
// Session end is signalled separately by an external Complete() / stream-EOF.
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
// Message 1
|
||
input.Feed(WrapInChunkFrame([1, 2, 3]));
|
||
input.Feed([202]); // CHUNK_END — framing reset only (no buffer-cursor recycle, no completion)
|
||
|
||
// First message is consumable
|
||
input.Initialize(out var buf1, out var pos1, out var bufLen1);
|
||
Assert.AreEqual(3, bufLen1);
|
||
Assert.AreEqual(1, buf1[0]);
|
||
Assert.AreEqual(2, buf1[1]);
|
||
Assert.AreEqual(3, buf1[2]);
|
||
|
||
// Simulate the AcBinaryDeserializer.Deserialize<T>(input, opts) finally block: the consumer
|
||
// calls MessageDone() AFTER it has finished reading the graph. This arms the
|
||
// _readPos = -1 sentinel; the next AppendToBuffer for message 2 sees rp < 0 and recycles
|
||
// the buffer to 0 (sliding-window cycling).
|
||
input.MessageDone();
|
||
|
||
// Message 2 — same long-lived input, just keeps going
|
||
input.Feed(WrapInChunkFrame([10, 20, 30, 40]));
|
||
input.Feed([202]);
|
||
|
||
// Re-initialize for the next deserializer call — the buffer was recycled to 0 by the
|
||
// sliding-window cycling triggered when AppendToBuffer saw _readPos == -1 (sentinel
|
||
// armed by the MessageDone() call above).
|
||
input.Initialize(out var buf2, out var pos2, out var bufLen2);
|
||
Assert.AreEqual(4, bufLen2);
|
||
Assert.AreEqual(10, buf2[0]);
|
||
Assert.AreEqual(20, buf2[1]);
|
||
Assert.AreEqual(30, buf2[2]);
|
||
Assert.AreEqual(40, buf2[3]);
|
||
|
||
// Now signal end-of-session explicitly
|
||
input.Complete();
|
||
|
||
// After Complete, TryAdvanceSegment returns false on empty — session truly ended
|
||
var pos3 = bufLen2;
|
||
var bufLen3 = bufLen2;
|
||
var buf3 = buf2;
|
||
var hasMore = input.TryAdvanceSegment(ref buf3, ref pos3, ref bufLen3, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Feed_ExternalComplete_SignalsEndOfSession()
|
||
{
|
||
// Explicit Complete() (or stream-EOF in the DrainFromAsync path) is the session-end signal —
|
||
// distinct from per-message [202] markers which only auto-reset for the next message.
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
input.Feed(WrapInChunkFrame([1, 2, 3]));
|
||
input.Complete(); // external session-end
|
||
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
Assert.AreEqual(3, bufferLength);
|
||
|
||
position = bufferLength;
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
[TestMethod]
|
||
public void Feed_UnexpectedMarker_ThrowsInvalidDataException()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
// Byte 0x42 is not 200/201/202 — should throw
|
||
_ = Assert.ThrowsExactly<InvalidDataException>(() => input.Feed([0x42]));
|
||
}
|
||
|
||
// ====================================================================
|
||
// Step 2 — DrainFromAsync extension (ACCORE-BIN-T-M2K1)
|
||
// ====================================================================
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_NullInput_ThrowsArgumentNullException()
|
||
{
|
||
var pipe = new Pipe();
|
||
await pipe.Writer.CompleteAsync();
|
||
|
||
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader));
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_NullReader_ThrowsArgumentNullException()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
|
||
await Assert.ThrowsExactlyAsync<ArgumentNullException>(async () => await input.DrainFromAsync(null!));
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_PipeWithData_FeedsAllBytes()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var pipe = new Pipe();
|
||
|
||
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
|
||
await pipe.Writer.WriteAsync(WrapInChunkFrame(data));
|
||
await pipe.Writer.CompleteAsync();
|
||
|
||
await input.DrainFromAsync(pipe.Reader);
|
||
|
||
var consumed = ConsumeAll(input);
|
||
CollectionAssert.AreEqual(data, consumed);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_EmptyPipeCompleted_CallsCompleteOnInput()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var pipe = new Pipe();
|
||
await pipe.Writer.CompleteAsync();
|
||
|
||
await input.DrainFromAsync(pipe.Reader);
|
||
|
||
// After drain, AsyncPipeReaderInput should be completed → TryAdvanceSegment returns false on empty
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_ConcurrentWriteDrainConsume_OrderPreserved()
|
||
{
|
||
// 3-thread pipeline: writer → pipe → drainer → input → consumer
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var pipe = new Pipe();
|
||
|
||
const int totalBytes = 4096;
|
||
var expected = new byte[totalBytes];
|
||
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
|
||
|
||
var consumeTask = Task.Run(() => ConsumeAll(input));
|
||
var drainTask = input.DrainFromAsync(pipe.Reader);
|
||
|
||
var writeTask = Task.Run(async () =>
|
||
{
|
||
try
|
||
{
|
||
const int chunkSize = 31;
|
||
var offset = 0;
|
||
while (offset < expected.Length)
|
||
{
|
||
var take = Math.Min(chunkSize, expected.Length - offset);
|
||
await pipe.Writer.WriteAsync(WrapInChunkFrame(expected, offset, take));
|
||
offset += take;
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
await pipe.Writer.CompleteAsync();
|
||
}
|
||
});
|
||
|
||
await Task.WhenAll(consumeTask, drainTask, writeTask);
|
||
|
||
var actual = consumeTask.Result;
|
||
CollectionAssert.AreEqual(expected, actual);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task DrainFromAsync_Cancellation_PropagatesAndCallsComplete()
|
||
{
|
||
using var input = new AsyncPipeReaderInput(64);
|
||
var pipe = new Pipe();
|
||
using var cts = new CancellationTokenSource();
|
||
|
||
var drainTask = input.DrainFromAsync(pipe.Reader, cts.Token);
|
||
|
||
cts.Cancel();
|
||
|
||
await Assert.ThrowsExactlyAsync<OperationCanceledException>(async () => await drainTask);
|
||
|
||
// Verify Complete was called in the finally block
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
|
||
Assert.IsFalse(hasMore);
|
||
}
|
||
|
||
// ====================================================================
|
||
// Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9)
|
||
//
|
||
// True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via
|
||
// AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader
|
||
// via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed).
|
||
// All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is
|
||
// still writing the tail of the message while the deserializer has already consumed the
|
||
// head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput).
|
||
//
|
||
// BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk
|
||
// boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state
|
||
// machine. Wire is uniform AsyncSegment chunked format (per ADR-0003 §9).
|
||
// ====================================================================
|
||
|
||
[TestMethod]
|
||
public async Task RealParallelPipeline_SerializeViaPipeWriter_DeserializeViaPipeReader_PayloadEquals()
|
||
{
|
||
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
|
||
var original = CreatePayload(50);
|
||
|
||
var pipe = new Pipe();
|
||
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
|
||
|
||
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestParentWithDateTimeItemCollection>(input, opts));
|
||
|
||
var drainTask = input.DrainFromAsync(pipe.Reader);
|
||
|
||
var serTask = Task.Run(async () =>
|
||
{
|
||
try
|
||
{
|
||
// SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire.
|
||
// AsyncPipeReaderInput.Feed strips framing internally on the receive side
|
||
// (default multiMessage = true).
|
||
AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts);
|
||
}
|
||
finally
|
||
{
|
||
await pipe.Writer.CompleteAsync();
|
||
}
|
||
});
|
||
|
||
await Task.WhenAll(serTask, drainTask, deserTask);
|
||
|
||
var result = deserTask.Result;
|
||
Assert.IsNotNull(result);
|
||
AssertPayloadEquals(original, result);
|
||
}
|
||
|
||
[TestMethod]
|
||
public async Task RealParallelPipeline_LargeScalePayload_ChunkSize4096_StructuralEquality()
|
||
{
|
||
// Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points
|
||
// = ~3700 deeply-nested objects with shared references. Serialized size ~few hundred KB →
|
||
// many chunks at chunkSize=4096 → real backpressure-driven streaming (PipeWriter pauseThreshold
|
||
// ~64KB, bytes flow incrementally as drainer + deserializer task pulls them out).
|
||
// This is the most-realistic real-parallel-pipeline test: in-memory Pipe + 3-task overlap +
|
||
// production-scale payload + production-scale chunk size.
|
||
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 };
|
||
var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100);
|
||
|
||
var pipe = new Pipe();
|
||
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
|
||
|
||
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize<TestOrder_All_True>(input, opts));
|
||
var drainTask = input.DrainFromAsync(pipe.Reader);
|
||
var serTask = Task.Run(async () =>
|
||
{
|
||
try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); }
|
||
finally { await pipe.Writer.CompleteAsync(); }
|
||
});
|
||
|
||
await Task.WhenAll(serTask, drainTask, deserTask);
|
||
|
||
var result = deserTask.Result;
|
||
Assert.IsNotNull(result);
|
||
Assert.AreEqual(original.Id, result.Id);
|
||
Assert.AreEqual(original.OrderNumber, result.OrderNumber);
|
||
Assert.AreEqual(original.Status, result.Status);
|
||
Assert.AreEqual(original.TotalAmount, result.TotalAmount);
|
||
|
||
var origCounts = CountTestOrderHierarchy(original);
|
||
var resultCounts = CountTestOrderHierarchy(result);
|
||
|
||
Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch");
|
||
Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch");
|
||
Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch");
|
||
Assert.AreEqual(origCounts.points, resultCounts.points, "Points count mismatch");
|
||
}
|
||
|
||
private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder_All_True order)
|
||
{
|
||
var items = order.Items.Count;
|
||
int pallets = 0, measurements = 0, points = 0;
|
||
foreach (var item in order.Items)
|
||
{
|
||
pallets += item.Pallets.Count;
|
||
foreach (var p in item.Pallets)
|
||
{
|
||
measurements += p.Measurements.Count;
|
||
points += p.Measurements.Sum(m => m.Points.Count);
|
||
}
|
||
}
|
||
return (items, pallets, measurements, points);
|
||
}
|
||
|
||
// ====================================================================
|
||
// Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning
|
||
// ====================================================================
|
||
//
|
||
// Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a
|
||
// different runtime type than new Pipe().Writer.GetType(). This is what makes
|
||
// AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential
|
||
// (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal
|
||
// BCL type names directly. If a future .NET unifies the two writer impls or renames the
|
||
// internal type in a way that breaks the detect, these tests fail before prod.
|
||
|
||
[TestMethod]
|
||
public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes()
|
||
{
|
||
var pipeBased = new Pipe().Writer.GetType();
|
||
var streamBased = PipeWriter.Create(Stream.Null).GetType();
|
||
|
||
// Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire
|
||
// would either always-true or always-false, both of which break correctness.
|
||
Assert.AreNotEqual(pipeBased, streamBased,
|
||
$"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " +
|
||
$"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}");
|
||
|
||
// Living documentation — typenames printed for debugging on future .NET upgrades.
|
||
Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}");
|
||
Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}");
|
||
}
|
||
|
||
[TestMethod]
|
||
public void StreamPipeWriterTypeField_MatchesFactoryResult()
|
||
{
|
||
// The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType()
|
||
// at class-load time. A second call to the factory MUST yield the same Type instance —
|
||
// otherwise the cache is stale and the runtime detect mis-classifies all stream writers.
|
||
var freshType = PipeWriter.Create(Stream.Null).GetType();
|
||
|
||
Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType,
|
||
"Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " +
|
||
"behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput).");
|
||
}
|
||
|
||
[TestMethod]
|
||
public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse()
|
||
{
|
||
// The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else
|
||
// sequential mode would be wrongly selected and we'd lose the parallelism feature.
|
||
var pipeBasedType = new Pipe().Writer.GetType();
|
||
|
||
Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType),
|
||
$"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " +
|
||
$"(or subclass) — runtime detect would mis-classify it as sequential.");
|
||
}
|
||
|
||
[TestMethod]
|
||
public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue()
|
||
{
|
||
// PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) —
|
||
// even for unusual stream types (file, memory, null).
|
||
Type[] writerTypes =
|
||
[
|
||
PipeWriter.Create(Stream.Null).GetType(),
|
||
PipeWriter.Create(new MemoryStream()).GetType(),
|
||
];
|
||
|
||
foreach (var t in writerTypes)
|
||
{
|
||
Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t),
|
||
$"PipeWriter.Create(<stream>) returned typename={t.FullName} which is not " +
|
||
$"assignable to StreamPipeWriterType — the BCL changed its factory contract.");
|
||
}
|
||
}
|
||
|
||
// ====================================================================
|
||
// Test helpers
|
||
// ====================================================================
|
||
|
||
/// <summary>
|
||
/// Wraps a raw payload in a single AsyncSegment chunk frame: <c>[201][UINT16 LE size][data]</c>.
|
||
/// Matches the wire format produced by <see cref="AsyncPipeWriterOutput"/> per chunk.
|
||
/// </summary>
|
||
private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length);
|
||
|
||
private static byte[] WrapInChunkFrame(byte[] data, int offset, int length)
|
||
{
|
||
var result = new byte[3 + length];
|
||
|
||
result[0] = 201; // CHUNK_DATA marker
|
||
result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte
|
||
result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte
|
||
|
||
Array.Copy(data, offset, result, 3, length);
|
||
return result;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Drains the input fully via the IBinaryInputBase contract, returning all consumed bytes.
|
||
/// Mimics the consumer pattern that <c>DeserializeSequence<TInput></c> uses internally.
|
||
/// </summary>
|
||
private static byte[] ConsumeAll(AsyncPipeReaderInput input)
|
||
{
|
||
var consumed = new List<byte>();
|
||
input.Initialize(out var buffer, out var position, out var bufferLength);
|
||
|
||
while (true)
|
||
{
|
||
while (position < bufferLength)
|
||
{
|
||
consumed.Add(buffer[position]);
|
||
position++;
|
||
}
|
||
|
||
if (!input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1))
|
||
break;
|
||
}
|
||
|
||
input.Release();
|
||
return consumed.ToArray();
|
||
}
|
||
|
||
private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount)
|
||
{
|
||
var now = DateTime.UtcNow;
|
||
var items = new List<TestEntityWithDateTimeAndInt>(itemCount);
|
||
|
||
for (var i = 0; i < itemCount; i++)
|
||
{
|
||
items.Add(new TestEntityWithDateTimeAndInt
|
||
{
|
||
Id = i + 1,
|
||
IntValue = i * 3,
|
||
Created = now.AddMinutes(-i),
|
||
Modified = now.AddMinutes(i),
|
||
StatusCode = i % 4,
|
||
Name = $"item-{i}"
|
||
});
|
||
}
|
||
|
||
return new TestParentWithDateTimeItemCollection
|
||
{
|
||
Id = 11,
|
||
Name = "real-parallel-pipeline",
|
||
Created = now,
|
||
Items = items
|
||
};
|
||
}
|
||
|
||
private static void AssertPayloadEquals(TestParentWithDateTimeItemCollection expected, TestParentWithDateTimeItemCollection actual)
|
||
{
|
||
Assert.AreEqual(expected.Id, actual.Id);
|
||
Assert.AreEqual(expected.Name, actual.Name);
|
||
Assert.AreEqual(expected.Created, actual.Created);
|
||
|
||
Assert.IsNotNull(expected.Items);
|
||
Assert.IsNotNull(actual.Items);
|
||
Assert.AreEqual(expected.Items.Count, actual.Items.Count);
|
||
|
||
for (var i = 0; i < expected.Items.Count; i++)
|
||
{
|
||
var e = expected.Items[i];
|
||
var a = actual.Items[i];
|
||
|
||
Assert.AreEqual(e.Id, a.Id);
|
||
Assert.AreEqual(e.IntValue, a.IntValue);
|
||
Assert.AreEqual(e.Created, a.Created);
|
||
Assert.AreEqual(e.Modified, a.Modified);
|
||
Assert.AreEqual(e.StatusCode, a.StatusCode);
|
||
Assert.AreEqual(e.Name, a.Name);
|
||
}
|
||
}
|
||
}
|