[LOADED_DOCS: 3 files, no new loads]

Refactor AsyncPipeWriterOutput buffer management

Refactored AsyncPipeWriterOutput to lazily allocate and reuse the fallback ArrayPool<byte> buffer across a serialize lifecycle, releasing it only once at the end. Replaced the _ownedBuffer boolean with _hasOwnedBuffer and a nullable _ownedBuffer field. Centralized buffer release logic, updated diagnostics, and improved chunk acquisition to minimize ArrayPool churn and clarify buffer ownership semantics.
This commit is contained in:
Loretta 2026-04-30 06:04:28 +02:00
parent 910b0deab8
commit 96a2f90535
1 changed files with 69 additions and 19 deletions

View File

@ -85,7 +85,14 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
private readonly TimeSpan _flushTimeout;
private int _committedBytes;
private int _currentChunkStart;
private bool _ownedBuffer;
// Whether the current chunk's buffer is the owned ArrayPool fallback (true) or a zero-copy
// PipeWriter slab (false). Used by CommitCurrentChunk to pick the commit strategy.
private bool _hasOwnedBuffer;
// Lazy-allocated, long-lived ArrayPool buffer used as the fallback when PipeWriter.GetMemory
// returns non-array-backed memory (custom non-byte[] PipeWriter — practically never on
// production writers). Reused across chunks within a single serialize lifecycle; returned
// to the pool exactly once in Flush(). Stays null entirely on the zero-copy path.
private byte[]? _ownedBuffer;
private ValueTask<FlushResult> _lastFlush;
/// <summary>
@ -133,7 +140,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
_serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter);
_committedBytes = 0;
_ownedBuffer = false;
_hasOwnedBuffer = false;
_ownedBuffer = null;
_lastFlush = default;
}
@ -233,17 +241,43 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
SyncAwaitFlush(_lastFlush);
CommitCurrentChunk(buffer, position);
// End of serialize lifecycle — return the owned fallback buffer to ArrayPool exactly
// once (NOT per chunk). The buffer was reused across all chunks in this lifecycle;
// releasing it now avoids per-chunk rent/return churn even when the fallback path
// actually fires.
ReleaseOwnedBuffer();
}
/// <summary>
/// No-op for PipeWriter-based output — chunks are owned by PipeWriter, not us.
/// Releases the lazy-allocated owned fallback buffer (if any) back to <see cref="ArrayPool{T}"/>.
/// Idempotent — safe to call multiple times. Called from <see cref="Flush"/> at the end of a
/// successful serialize cycle, and also from <see cref="Reset"/> for defensive cleanup if a
/// caller reuses the struct.
/// </summary>
public void Reset() { }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ReleaseOwnedBuffer()
{
if (_ownedBuffer != null)
{
ArrayPool<byte>.Shared.Return(_ownedBuffer);
_ownedBuffer = null;
}
_hasOwnedBuffer = false;
}
/// <summary>
/// Defensive cleanup — releases the owned fallback buffer if one was retained. Normally a
/// no-op for PipeWriter-based output (chunks are owned by PipeWriter), but covers the rare
/// case where the same struct instance is reused after a serialize without going through Flush.
/// </summary>
public void Reset() => ReleaseOwnedBuffer();
/// <summary>
/// Commits the current chunk to the PipeWriter. In framed mode, patches the reserved
/// <c>[201][UINT16 dataBytes]</c> header before Advance; in raw mode, simply Advances the data.
/// For owned buffers, copies to PipeWriter first.
/// For owned buffers, copies to PipeWriter first (without releasing the buffer — Flush() does
/// that once at the end).
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CommitCurrentChunk(byte[] buffer, int position)
@ -258,32 +292,35 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes);
EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})");
EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _hasOwnedBuffer={_hasOwnedBuffer} → Advance({HeaderSize + dataBytes})");
if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes);
if (_hasOwnedBuffer) CopyOwnedToPipeWriter(buffer, headerStart, HeaderSize + dataBytes);
else _pipeWriter.Advance(HeaderSize + dataBytes);
}
else
{
EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({dataBytes})");
EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _hasOwnedBuffer={_hasOwnedBuffer} → Advance({dataBytes})");
if (_ownedBuffer) FlushOwnedBuffer(buffer, _currentChunkStart, dataBytes);
if (_hasOwnedBuffer) CopyOwnedToPipeWriter(buffer, _currentChunkStart, dataBytes);
else _pipeWriter.Advance(dataBytes);
}
_committedBytes += dataBytes; // only count data bytes, not framing
}
/// <summary>
/// Copies the owned-buffer chunk into the PipeWriter (since GetMemory returned non-array-backed
/// memory and the serializer needs <c>byte[]</c>). Does NOT return the buffer to the pool —
/// the <see cref="_ownedBuffer"/> reference stays alive across chunks within this serialize
/// lifecycle; <see cref="Flush"/> releases it exactly once at the end.
/// </summary>
[MethodImpl(MethodImplOptions.NoInlining)]
private void FlushOwnedBuffer(byte[] buffer, int start, int length)
private void CopyOwnedToPipeWriter(byte[] buffer, int start, int length)
{
var span = _pipeWriter.GetSpan(length);
buffer.AsSpan(start, length).CopyTo(span);
_pipeWriter.Advance(length);
ArrayPool<byte>.Shared.Return(buffer);
_ownedBuffer = false;
}
private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd)
@ -303,19 +340,32 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase
buffer = segment.Array;
position = segment.Offset + headerOffset;
bufferEnd = segment.Offset + headerOffset + dataSize;
_ownedBuffer = false;
_hasOwnedBuffer = false;
EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]");
}
else
{
var owned = ArrayPool<byte>.Shared.Rent(totalRequest);
buffer = owned;
// Lazy reuse: keep the same ArrayPool buffer across chunks within this serialize
// lifecycle. Only re-rent if the existing buffer is too small for the next chunk.
// Released exactly once in Flush() — no per-chunk rent/return churn even when this
// (rare) path actually fires.
if (_ownedBuffer == null || _ownedBuffer.Length < totalRequest)
{
if (_ownedBuffer != null) ArrayPool<byte>.Shared.Return(_ownedBuffer);
_ownedBuffer = ArrayPool<byte>.Shared.Rent(totalRequest);
EmitDiagnostic($"AcquireChunk[ob-rent]: rented={_ownedBuffer.Length} (totalRequest={totalRequest})");
}
else
{
EmitDiagnostic($"AcquireChunk[ob-reuse]: reused={_ownedBuffer.Length} (totalRequest={totalRequest})");
}
buffer = _ownedBuffer;
position = headerOffset;
bufferEnd = headerOffset + dataSize;
_ownedBuffer = true;
EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]");
_hasOwnedBuffer = true;
}
}
}