AyCode.Core/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs

160 lines
5.5 KiB
C#

using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using AyCode.Core.Helpers;
namespace AyCode.Core.Serializers.Binaries;
/// <summary>
/// Binary output that writes to a PipeWriter with per-chunk network flush.
///
/// Identical to BufferWriterBinaryOutput except: Grow() calls PipeWriter.FlushAsync().Forget()
/// after committing each chunk, so data flows to the network as it's being serialized
/// rather than waiting for the full serialization to complete.
///
/// Backpressure: stores the last FlushAsync ValueTask. If the previous flush hasn't completed
/// by the next Grow(), blocks until it does. This bounds memory to ~2 chunks.
///
/// The first Grow() skips the flush to keep the length prefix span valid for patching.
/// </summary>
public struct AsyncPipeWriterOutput : IBinaryOutputBase
{
private readonly PipeWriter _pipeWriter;
private readonly int _chunkSize;
private int _committedBytes;
private int _currentChunkStart;
private bool _ownedBuffer;
private ValueTask<FlushResult> _lastFlush;
private bool _firstGrow;
public AsyncPipeWriterOutput(PipeWriter pipeWriter, int chunkSize = 4096)
{
_pipeWriter = pipeWriter;
_chunkSize = chunkSize;
_committedBytes = 0;
_ownedBuffer = false;
_lastFlush = default;
_firstGrow = true;
}
/// <summary>
/// Provides the initial buffer from the PipeWriter.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Initialize(out byte[] buffer, out int position, out int bufferEnd)
{
_committedBytes = 0;
_lastFlush = default;
_firstGrow = true;
AcquireChunk(_chunkSize, out buffer, out position, out bufferEnd);
_currentChunkStart = position;
}
/// <summary>
/// Called when the context's buffer is full. Commits current chunk to the PipeWriter,
/// fires a background flush (except on the first call — length prefix must stay valid),
/// and acquires a new chunk.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
public void Grow(ref byte[] buffer, ref int position, ref int bufferEnd, int needed)
{
// Backpressure: wait for previous flush if still in progress
if (!_lastFlush.IsCompleted)
_lastFlush.GetAwaiter().GetResult();
// Commit bytes written in current chunk
var bytesInChunk = position - _currentChunkStart;
if (bytesInChunk > 0)
{
if (_ownedBuffer)
FlushOwnedBuffer(buffer, bytesInChunk);
else
_pipeWriter.Advance(bytesInChunk);
_committedBytes += bytesInChunk;
}
// Fire-and-forget flush — EXCEPT first chunk (length prefix span must stay valid)
if (!_firstGrow)
{
_lastFlush = _pipeWriter.FlushAsync();
_lastFlush.Forget();
}
_firstGrow = false;
// Acquire new chunk
AcquireChunk(Math.Max(needed, _chunkSize), out buffer, out position, out bufferEnd);
_currentChunkStart = position;
}
/// <summary>
/// Returns total bytes written: committed + pending in current chunk.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetTotalPosition(int currentPosition)
=> _committedBytes + (currentPosition - _currentChunkStart);
/// <summary>
/// Commits final pending bytes and performs a synchronous flush.
/// Must be called after all writes are complete.
/// </summary>
public void Flush(byte[] buffer, int position)
{
// Wait for any in-flight flush
if (!_lastFlush.IsCompleted)
_lastFlush.GetAwaiter().GetResult();
var bytesInChunk = position - _currentChunkStart;
if (bytesInChunk > 0)
{
if (_ownedBuffer)
FlushOwnedBuffer(buffer, bytesInChunk);
else
_pipeWriter.Advance(bytesInChunk);
}
// Final synchronous flush — ensures all data reaches the network
_pipeWriter.FlushAsync().GetAwaiter().GetResult();
}
/// <summary>
/// No-op for PipeWriter-based output — chunks are owned by PipeWriter, not us.
/// </summary>
public void Reset() { }
[MethodImpl(MethodImplOptions.NoInlining)]
private void FlushOwnedBuffer(byte[] buffer, int bytesInChunk)
{
var span = _pipeWriter.GetSpan(bytesInChunk);
buffer.AsSpan(_currentChunkStart, bytesInChunk).CopyTo(span);
_pipeWriter.Advance(bytesInChunk);
ArrayPool<byte>.Shared.Return(buffer);
_ownedBuffer = false;
}
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
{
var actualRequest = Math.Max(requestSize, _chunkSize);
var memory = _pipeWriter.GetMemory(actualRequest);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) && segment.Array != null)
{
buffer = segment.Array;
position = segment.Offset;
bufferEnd = segment.Offset + segment.Count;
_ownedBuffer = false;
}
else
{
// Fallback for non-array-backed PipeWriter (e.g. Kestrel PinnedBlockMemoryPool)
var owned = ArrayPool<byte>.Shared.Rent(actualRequest);
buffer = owned;
position = 0;
bufferEnd = owned.Length;
_ownedBuffer = true;
}
}
}