using AyCode.Core.Serializers.Binaries;
using AyCode.Core.Tests.TestModels;
using System.IO;
using System.IO.Pipelines;
using static AyCode.Core.Tests.TestModels.AcSerializerModels;
namespace AyCode.Core.Tests.Serialization;
///
/// Unit tests for (Step 1, ACCORE-BIN-T-D6H4) and the
/// extension (Step 2, ACCORE-BIN-T-M2K1),
/// plus the real parallel pipeline test (Step 3, ACCORE-BIN-T-V7C9), plus runtime type-detect
/// sanity pinning (Step 4).
///
/// Tests run with 's default multiMessage = true —
/// expects the AsyncSegment chunked wire format
/// [201][UINT16 LE size][data] per chunk, tolerates [200] CHUNK_START prefix, and
/// signals end-of-stream on [202] CHUNK_END. The helper
/// wraps test data into single chunk frames; multi-chunk tests concatenate multiple frames.
///
/// Wire format identical to framed output and to
/// SignalR's AcBinaryHubProtocol.TryParseChunkData input — unified across all transports
/// per ADR-0003 §9.
///
[TestClass]
public class AcBinarySerializerPipeParallelTests
{
// ====================================================================
// Step 1 — AsyncPipeReaderInput contract (ACCORE-BIN-T-D6H4)
// ====================================================================
[TestMethod]
public void Feed_EmptyData_NoOp()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(ReadOnlySpan.Empty);
input.Complete();
// No data → TryAdvanceSegment returns false immediately
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(0, bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_AppendsBytes_AccessibleViaTryAdvanceSegment()
{
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
input.Feed(WrapInChunkFrame(data)); // [201][UINT16=8][1..8]
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public void Initialize_BeforeFeed_ReturnsEmptyBuffer()
{
using var input = new AsyncPipeReaderInput(64);
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.IsNotNull(buffer);
Assert.AreEqual(0, position);
Assert.AreEqual(0, bufferLength);
}
[TestMethod]
public void Initialize_AfterFeed_ReturnsAvailableData()
{
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30 };
input.Feed(WrapInChunkFrame(data));
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(0, position);
Assert.AreEqual(3, bufferLength);
Assert.AreEqual((byte)10, buffer[0]);
Assert.AreEqual((byte)20, buffer[1]);
Assert.AreEqual((byte)30, buffer[2]);
}
[TestMethod]
public void Complete_AllConsumed_TryAdvanceSegmentReturnsFalse()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete();
// Simulate consumer that has read all 3 bytes
input.Initialize(out var buffer, out var position, out var bufferLength);
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Complete_WithLeftoverData_TryAdvanceSegmentReturnsTrueWithRemainder()
{
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed(WrapInChunkFrame([4, 5, 6]));
input.Complete();
// Simulate consumer that has read 3 of 6 bytes — advance should expose the rest
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(6, bufferLength);
position = 3;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsTrue(hasMore);
Assert.AreEqual(3, position);
Assert.AreEqual(6, bufferLength);
Assert.AreEqual((byte)4, buffer[3]);
Assert.AreEqual((byte)5, buffer[4]);
Assert.AreEqual((byte)6, buffer[5]);
}
[TestMethod]
public void Grow_PastInitialCapacity_BytesPreservedAcrossGrows()
{
// Initial capacity = 16, feed > 16 bytes consecutively (no consume between) → forces grow
using var input = new AsyncPipeReaderInput(16);
var data = new byte[64];
for (var i = 0; i < data.Length; i++) data[i] = (byte)i;
// Feed in chunks that overflow the initial buffer (each wrapped in a chunk frame)
input.Feed(WrapInChunkFrame(data, 0, 16));
input.Feed(WrapInChunkFrame(data, 16, 16)); // grow #1
input.Feed(WrapInChunkFrame(data, 32, 32)); // grow #2
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public async Task ProducerConsumer_Concurrency_AllBytesDeliveredInOrder()
{
const int totalBytes = 8192;
const int chunkSize = 17; // intentional: not a power of 2, exercises partial fills
using var input = new AsyncPipeReaderInput(64);
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var produceTask = Task.Run(() =>
{
try
{
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
input.Feed(WrapInChunkFrame(expected, offset, take));
offset += take;
}
}
finally
{
input.Complete();
}
});
await Task.WhenAll(consumeTask, produceTask);
var actual = consumeTask.Result;
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public async Task ProducerConsumer_SlidingWindowCycle_ManyResetsHandledCorrectly()
{
// Small initial buffer + slow producer drives many reset-to-0 cycles.
const int totalBytes = 32 * 1024;
const int chunkSize = 7;
using var input = new AsyncPipeReaderInput(32);
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var produceTask = Task.Run(async () =>
{
try
{
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
input.Feed(WrapInChunkFrame(expected, offset, take));
offset += take;
if ((offset & 0x7F) == 0) await Task.Yield();
}
}
finally
{
input.Complete();
}
});
await Task.WhenAll(consumeTask, produceTask);
var actual = consumeTask.Result;
Assert.AreEqual(expected.Length, actual.Length);
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public void Dispose_DoesNotThrow()
{
var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete();
input.Dispose();
}
[TestMethod]
public void Constructor_InvalidCapacity_ThrowsArgumentOutOfRange()
{
_ = Assert.ThrowsExactly(() => new AsyncPipeReaderInput(0));
_ = Assert.ThrowsExactly(() => new AsyncPipeReaderInput(-1));
}
[TestMethod]
public void Feed_PartialFrameAcrossCalls_ParsedCorrectly()
{
// Verifies the framing state machine survives partial frame headers / sizes / data
// split across multiple Feed calls.
using var input = new AsyncPipeReaderInput(64);
var data = new byte[] { 10, 20, 30, 40, 50 };
var frame = WrapInChunkFrame(data); // 8 bytes total: [201][05][00][10][20][30][40][50]
// Feed byte-by-byte to stress the state machine
for (var i = 0; i < frame.Length; i++) input.Feed(frame.AsSpan(i, 1));
input.Complete();
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public void Feed_ChunkEndMarker_AutoResetsForNextMessage()
{
// [202] CHUNK_END is end-of-MESSAGE on the WIRE — NOT end-of-session and NOT, by itself,
// a buffer-cursor recycle. On [202], the framing-state machine resets to AwaitingHeader so
// the next [201] header is parsed correctly; buffer-cursor recycling is armed separately by
// the consumer via MessageDone() (typically from the AcBinaryDeserializer.Deserialize(
// AsyncPipeReaderInput, opts) finally block, AFTER the deserialiser has finished reading
// the structurally-complete graph). See BINARY_ISSUES.md#accore-bin-i-q4t8 / R5K2 fix.
// Session end is signalled separately by an external Complete() / stream-EOF.
using var input = new AsyncPipeReaderInput(64);
// Message 1
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Feed([202]); // CHUNK_END — framing reset only (no buffer-cursor recycle, no completion)
// First message is consumable
input.Initialize(out var buf1, out var pos1, out var bufLen1);
Assert.AreEqual(3, bufLen1);
Assert.AreEqual(1, buf1[0]);
Assert.AreEqual(2, buf1[1]);
Assert.AreEqual(3, buf1[2]);
// Simulate the AcBinaryDeserializer.Deserialize(input, opts) finally block: the consumer
// calls MessageDone() AFTER it has finished reading the graph. This arms the
// _readPos = -1 sentinel; the next AppendToBuffer for message 2 sees rp < 0 and recycles
// the buffer to 0 (sliding-window cycling).
input.MessageDone();
// Message 2 — same long-lived input, just keeps going
input.Feed(WrapInChunkFrame([10, 20, 30, 40]));
input.Feed([202]);
// Re-initialize for the next deserializer call — the buffer was recycled to 0 by the
// sliding-window cycling triggered when AppendToBuffer saw _readPos == -1 (sentinel
// armed by the MessageDone() call above).
input.Initialize(out var buf2, out var pos2, out var bufLen2);
Assert.AreEqual(4, bufLen2);
Assert.AreEqual(10, buf2[0]);
Assert.AreEqual(20, buf2[1]);
Assert.AreEqual(30, buf2[2]);
Assert.AreEqual(40, buf2[3]);
// Now signal end-of-session explicitly
input.Complete();
// After Complete, TryAdvanceSegment returns false on empty — session truly ended
var pos3 = bufLen2;
var bufLen3 = bufLen2;
var buf3 = buf2;
var hasMore = input.TryAdvanceSegment(ref buf3, ref pos3, ref bufLen3, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_ExternalComplete_SignalsEndOfSession()
{
// Explicit Complete() (or stream-EOF in the DrainFromAsync path) is the session-end signal —
// distinct from per-message [202] markers which only auto-reset for the next message.
using var input = new AsyncPipeReaderInput(64);
input.Feed(WrapInChunkFrame([1, 2, 3]));
input.Complete(); // external session-end
input.Initialize(out var buffer, out var position, out var bufferLength);
Assert.AreEqual(3, bufferLength);
position = bufferLength;
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public void Feed_UnexpectedMarker_ThrowsInvalidDataException()
{
using var input = new AsyncPipeReaderInput(64);
// Byte 0x42 is not 200/201/202 — should throw
_ = Assert.ThrowsExactly(() => input.Feed([0x42]));
}
// ====================================================================
// Step 2 — DrainFromAsync extension (ACCORE-BIN-T-M2K1)
// ====================================================================
[TestMethod]
public async Task DrainFromAsync_NullInput_ThrowsArgumentNullException()
{
var pipe = new Pipe();
await pipe.Writer.CompleteAsync();
await Assert.ThrowsExactlyAsync(async () => await AsyncPipeReaderInputExtensions.DrainFromAsync(null!, pipe.Reader));
}
[TestMethod]
public async Task DrainFromAsync_NullReader_ThrowsArgumentNullException()
{
using var input = new AsyncPipeReaderInput(64);
await Assert.ThrowsExactlyAsync(async () => await input.DrainFromAsync(null!));
}
[TestMethod]
public async Task DrainFromAsync_PipeWithData_FeedsAllBytes()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
await pipe.Writer.WriteAsync(WrapInChunkFrame(data));
await pipe.Writer.CompleteAsync();
await input.DrainFromAsync(pipe.Reader);
var consumed = ConsumeAll(input);
CollectionAssert.AreEqual(data, consumed);
}
[TestMethod]
public async Task DrainFromAsync_EmptyPipeCompleted_CallsCompleteOnInput()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
await pipe.Writer.CompleteAsync();
await input.DrainFromAsync(pipe.Reader);
// After drain, AsyncPipeReaderInput should be completed → TryAdvanceSegment returns false on empty
input.Initialize(out var buffer, out var position, out var bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
[TestMethod]
public async Task DrainFromAsync_ConcurrentWriteDrainConsume_OrderPreserved()
{
// 3-thread pipeline: writer → pipe → drainer → input → consumer
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
const int totalBytes = 4096;
var expected = new byte[totalBytes];
for (var i = 0; i < totalBytes; i++) expected[i] = (byte)(i & 0xFF);
var consumeTask = Task.Run(() => ConsumeAll(input));
var drainTask = input.DrainFromAsync(pipe.Reader);
var writeTask = Task.Run(async () =>
{
try
{
const int chunkSize = 31;
var offset = 0;
while (offset < expected.Length)
{
var take = Math.Min(chunkSize, expected.Length - offset);
await pipe.Writer.WriteAsync(WrapInChunkFrame(expected, offset, take));
offset += take;
}
}
finally
{
await pipe.Writer.CompleteAsync();
}
});
await Task.WhenAll(consumeTask, drainTask, writeTask);
var actual = consumeTask.Result;
CollectionAssert.AreEqual(expected, actual);
}
[TestMethod]
public async Task DrainFromAsync_Cancellation_PropagatesAndCallsComplete()
{
using var input = new AsyncPipeReaderInput(64);
var pipe = new Pipe();
using var cts = new CancellationTokenSource();
var drainTask = input.DrainFromAsync(pipe.Reader, cts.Token);
cts.Cancel();
await Assert.ThrowsExactlyAsync(async () => await drainTask);
// Verify Complete was called in the finally block
input.Initialize(out var buffer, out var position, out var bufferLength);
var hasMore = input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1);
Assert.IsFalse(hasMore);
}
// ====================================================================
// Step 3 — Real parallel pipeline test (ACCORE-BIN-T-V7C9)
//
// True 3-task pipeline: AcBinarySerializer writes framed chunks to pipe.Writer via
// AsyncPipeWriterOutput (framed mode under the hood) — drainer pulls from pipe.Reader
// via DrainFromAsync — deserializer reads from AsyncPipeReaderInput (framing-aware Feed).
// All three run concurrently with TRUE serialize↔deserialize overlap (the serializer is
// still writing the tail of the message while the deserializer has already consumed the
// head, courtesy of per-chunk SyncAwaitFlush in AsyncPipeWriterOutput).
//
// BufferWriterChunkSize = 256 → small payloads cross multiple [201][UINT16][data] chunk
// boundaries on the wire, exercising the framing-aware AsyncPipeReaderInput.Feed state
// machine. Wire is uniform AsyncSegment chunked format (per ADR-0003 §9).
// ====================================================================
[TestMethod]
public async Task RealParallelPipeline_SerializeViaPipeWriter_DeserializeViaPipeReader_PayloadEquals()
{
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 256 };
var original = CreatePayload(50);
var pipe = new Pipe();
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, opts));
var drainTask = input.DrainFromAsync(pipe.Reader);
var serTask = Task.Run(async () =>
{
try
{
// SerializeChunkedFramed — writes [201][UINT16][data] per chunk on the wire.
// AsyncPipeReaderInput.Feed strips framing internally on the receive side
// (default multiMessage = true).
AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts);
}
finally
{
await pipe.Writer.CompleteAsync();
}
});
await Task.WhenAll(serTask, drainTask, deserTask);
var result = deserTask.Result;
Assert.IsNotNull(result);
AssertPayloadEquals(original, result);
}
[TestMethod]
public async Task RealParallelPipeline_LargeScalePayload_ChunkSize4096_StructuralEquality()
{
// Production-scale payload via TestDataFactory: 100 root items × 3 pallets × 3 measurements × 4 points
// = ~3700 deeply-nested objects with shared references. Serialized size ~few hundred KB →
// many chunks at chunkSize=4096 → real backpressure-driven streaming (PipeWriter pauseThreshold
// ~64KB, bytes flow incrementally as drainer + deserializer task pulls them out).
// This is the most-realistic real-parallel-pipeline test: in-memory Pipe + 3-task overlap +
// production-scale payload + production-scale chunk size.
var opts = new AcBinarySerializerOptions { BufferWriterChunkSize = 4096 };
var original = TestDataFactory.CreateLargeScaleBenchmarkOrder(rootItemCount: 100);
var pipe = new Pipe();
using var input = new AsyncPipeReaderInput(initialCapacity: opts.BufferWriterChunkSize * 2);
var deserTask = Task.Run(() => AcBinaryDeserializer.Deserialize(input, opts));
var drainTask = input.DrainFromAsync(pipe.Reader);
var serTask = Task.Run(async () =>
{
try { AcBinarySerializer.SerializeChunkedFramed(original, pipe.Writer, opts); }
finally { await pipe.Writer.CompleteAsync(); }
});
await Task.WhenAll(serTask, drainTask, deserTask);
var result = deserTask.Result;
Assert.IsNotNull(result);
Assert.AreEqual(original.Id, result.Id);
Assert.AreEqual(original.OrderNumber, result.OrderNumber);
Assert.AreEqual(original.Status, result.Status);
Assert.AreEqual(original.TotalAmount, result.TotalAmount);
var origCounts = CountTestOrderHierarchy(original);
var resultCounts = CountTestOrderHierarchy(result);
Assert.AreEqual(origCounts.items, resultCounts.items, "Items count mismatch");
Assert.AreEqual(origCounts.pallets, resultCounts.pallets, "Pallets count mismatch");
Assert.AreEqual(origCounts.measurements, resultCounts.measurements, "Measurements count mismatch");
Assert.AreEqual(origCounts.points, resultCounts.points, "Points count mismatch");
}
private static (int items, int pallets, int measurements, int points) CountTestOrderHierarchy(TestOrder order)
{
var items = order.Items.Count;
int pallets = 0, measurements = 0, points = 0;
foreach (var item in order.Items)
{
pallets += item.Pallets.Count;
foreach (var p in item.Pallets)
{
measurements += p.Measurements.Count;
points += p.Measurements.Sum(m => m.Points.Count);
}
}
return (items, pallets, measurements, points);
}
// ====================================================================
// Step 4 — AsyncPipeWriterOutput runtime type detect — sanity pinning
// ====================================================================
//
// Guards the architectural assumption that PipeWriter.Create(Stream).GetType() resolves to a
// different runtime type than new Pipe().Writer.GetType(). This is what makes
// AsyncPipeWriterOutput._serializeFlushAndAcquire auto-select between sequential
// (Stream-backed) and parallel (Pipe-based) flush strategies safe — without touching internal
// BCL type names directly. If a future .NET unifies the two writer impls or renames the
// internal type in a way that breaks the detect, these tests fail before prod.
[TestMethod]
public void StreamPipeWriter_AndPipeWriter_AreDistinctTypes()
{
var pipeBased = new Pipe().Writer.GetType();
var streamBased = PipeWriter.Create(Stream.Null).GetType();
// Cornerstone of the runtime detect — must NEVER unify, else _serializeFlushAndAcquire
// would either always-true or always-false, both of which break correctness.
Assert.AreNotEqual(pipeBased, streamBased,
$"Runtime types unified — pipe-based and stream-backed PipeWriter must remain distinct. " +
$"pipeBased={pipeBased.FullName}, streamBased={streamBased.FullName}");
// Living documentation — typenames printed for debugging on future .NET upgrades.
Console.WriteLine($"Pipe.Writer typename: {pipeBased.FullName}");
Console.WriteLine($"PipeWriter.Create(Stream) typename: {streamBased.FullName}");
}
[TestMethod]
public void StreamPipeWriterTypeField_MatchesFactoryResult()
{
// The static field caches the StreamPipeWriter type via PipeWriter.Create(Stream.Null).GetType()
// at class-load time. A second call to the factory MUST yield the same Type instance —
// otherwise the cache is stale and the runtime detect mis-classifies all stream writers.
var freshType = PipeWriter.Create(Stream.Null).GetType();
Assert.AreSame(freshType, AsyncPipeWriterOutput.StreamPipeWriterType,
"Cached StreamPipeWriterType differs from a fresh factory result — the BCL is " +
"behaving non-deterministically (or the test was loaded before AsyncPipeWriterOutput).");
}
[TestMethod]
public void IsAssignableFrom_PipeBasedWriter_ReturnsFalse()
{
// The Pipe.Writer impl must NOT be a StreamPipeWriter (or subclass thereof) — else
// sequential mode would be wrongly selected and we'd lose the parallelism feature.
var pipeBasedType = new Pipe().Writer.GetType();
Assert.IsFalse(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(pipeBasedType),
$"Pipe.Writer typename={pipeBasedType.FullName} is unexpectedly a StreamPipeWriter " +
$"(or subclass) — runtime detect would mis-classify it as sequential.");
}
[TestMethod]
public void IsAssignableFrom_StreamBackedWriters_ReturnsTrue()
{
// PipeWriter.Create(stream) must always yield a StreamPipeWriter (or subclass) —
// even for unusual stream types (file, memory, null).
Type[] writerTypes =
[
PipeWriter.Create(Stream.Null).GetType(),
PipeWriter.Create(new MemoryStream()).GetType(),
];
foreach (var t in writerTypes)
{
Assert.IsTrue(AsyncPipeWriterOutput.StreamPipeWriterType.IsAssignableFrom(t),
$"PipeWriter.Create() returned typename={t.FullName} which is not " +
$"assignable to StreamPipeWriterType — the BCL changed its factory contract.");
}
}
// ====================================================================
// Test helpers
// ====================================================================
///
/// Wraps a raw payload in a single AsyncSegment chunk frame: [201][UINT16 LE size][data].
/// Matches the wire format produced by per chunk.
///
private static byte[] WrapInChunkFrame(byte[] data) => WrapInChunkFrame(data, 0, data.Length);
private static byte[] WrapInChunkFrame(byte[] data, int offset, int length)
{
var result = new byte[3 + length];
result[0] = 201; // CHUNK_DATA marker
result[1] = (byte)(length & 0xFF); // UINT16 LE size, low byte
result[2] = (byte)((length >> 8) & 0xFF); // UINT16 LE size, high byte
Array.Copy(data, offset, result, 3, length);
return result;
}
///
/// Drains the input fully via the IBinaryInputBase contract, returning all consumed bytes.
/// Mimics the consumer pattern that DeserializeSequence<TInput> uses internally.
///
private static byte[] ConsumeAll(AsyncPipeReaderInput input)
{
var consumed = new List();
input.Initialize(out var buffer, out var position, out var bufferLength);
while (true)
{
while (position < bufferLength)
{
consumed.Add(buffer[position]);
position++;
}
if (!input.TryAdvanceSegment(ref buffer, ref position, ref bufferLength, 1))
break;
}
input.Release();
return consumed.ToArray();
}
private static TestParentWithDateTimeItemCollection CreatePayload(int itemCount)
{
var now = DateTime.UtcNow;
var items = new List(itemCount);
for (var i = 0; i < itemCount; i++)
{
items.Add(new TestEntityWithDateTimeAndInt
{
Id = i + 1,
IntValue = i * 3,
Created = now.AddMinutes(-i),
Modified = now.AddMinutes(i),
StatusCode = i % 4,
Name = $"item-{i}"
});
}
return new TestParentWithDateTimeItemCollection
{
Id = 11,
Name = "real-parallel-pipeline",
Created = now,
Items = items
};
}
private static void AssertPayloadEquals(TestParentWithDateTimeItemCollection expected, TestParentWithDateTimeItemCollection actual)
{
Assert.AreEqual(expected.Id, actual.Id);
Assert.AreEqual(expected.Name, actual.Name);
Assert.AreEqual(expected.Created, actual.Created);
Assert.IsNotNull(expected.Items);
Assert.IsNotNull(actual.Items);
Assert.AreEqual(expected.Items.Count, actual.Items.Count);
for (var i = 0; i < expected.Items.Count; i++)
{
var e = expected.Items[i];
var a = actual.Items[i];
Assert.AreEqual(e.Id, a.Id);
Assert.AreEqual(e.IntValue, a.IntValue);
Assert.AreEqual(e.Created, a.Created);
Assert.AreEqual(e.Modified, a.Modified);
Assert.AreEqual(e.StatusCode, a.StatusCode);
Assert.AreEqual(e.Name, a.Name);
}
}
}