From 96a2f905355c5ab129c877273b289731d622b6a5 Mon Sep 17 00:00:00 2001 From: Loretta Date: Thu, 30 Apr 2026 06:04:28 +0200 Subject: [PATCH] [LOADED_DOCS: 3 files, no new loads] Refactor AsyncPipeWriterOutput buffer management Refactored AsyncPipeWriterOutput to lazily allocate and reuse the fallback ArrayPool buffer across a serialize lifecycle, releasing it only once at the end. Replaced the _ownedBuffer boolean with _hasOwnedBuffer and a nullable _ownedBuffer field. Centralized buffer release logic, updated diagnostics, and improved chunk acquisition to minimize ArrayPool churn and clarify buffer ownership semantics. --- .../Binaries/AsyncPipeWriterOutput.cs | 88 +++++++++++++++---- 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs index f29d66f..eb1e523 100644 --- a/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs +++ b/AyCode.Core/Serializers/Binaries/AsyncPipeWriterOutput.cs @@ -85,7 +85,14 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase private readonly TimeSpan _flushTimeout; private int _committedBytes; private int _currentChunkStart; - private bool _ownedBuffer; + // Whether the current chunk's buffer is the owned ArrayPool fallback (true) or a zero-copy + // PipeWriter slab (false). Used by CommitCurrentChunk to pick the commit strategy. + private bool _hasOwnedBuffer; + // Lazy-allocated, long-lived ArrayPool buffer used as the fallback when PipeWriter.GetMemory + // returns non-array-backed memory (custom non-byte[] PipeWriter — practically never on + // production writers). Reused across chunks within a single serialize lifecycle; returned + // to the pool exactly once in Flush(). Stays null entirely on the zero-copy path. + private byte[]? _ownedBuffer; private ValueTask _lastFlush; /// @@ -133,7 +140,8 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase _serializeFlushAndAcquire = StreamPipeWriterType.IsInstanceOfType(pipeWriter); _committedBytes = 0; - _ownedBuffer = false; + _hasOwnedBuffer = false; + _ownedBuffer = null; _lastFlush = default; } @@ -233,17 +241,43 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase SyncAwaitFlush(_lastFlush); CommitCurrentChunk(buffer, position); + + // End of serialize lifecycle — return the owned fallback buffer to ArrayPool exactly + // once (NOT per chunk). The buffer was reused across all chunks in this lifecycle; + // releasing it now avoids per-chunk rent/return churn even when the fallback path + // actually fires. + ReleaseOwnedBuffer(); } /// - /// No-op for PipeWriter-based output — chunks are owned by PipeWriter, not us. + /// Releases the lazy-allocated owned fallback buffer (if any) back to . + /// Idempotent — safe to call multiple times. Called from at the end of a + /// successful serialize cycle, and also from for defensive cleanup if a + /// caller reuses the struct. /// - public void Reset() { } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ReleaseOwnedBuffer() + { + if (_ownedBuffer != null) + { + ArrayPool.Shared.Return(_ownedBuffer); + _ownedBuffer = null; + } + _hasOwnedBuffer = false; + } + + /// + /// Defensive cleanup — releases the owned fallback buffer if one was retained. Normally a + /// no-op for PipeWriter-based output (chunks are owned by PipeWriter), but covers the rare + /// case where the same struct instance is reused after a serialize without going through Flush. + /// + public void Reset() => ReleaseOwnedBuffer(); /// /// Commits the current chunk to the PipeWriter. In framed mode, patches the reserved /// [201][UINT16 dataBytes] header before Advance; in raw mode, simply Advances the data. - /// For owned buffers, copies to PipeWriter first. + /// For owned buffers, copies to PipeWriter first (without releasing the buffer — Flush() does + /// that once at the end). /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private void CommitCurrentChunk(byte[] buffer, int position) @@ -258,32 +292,35 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(headerStart + 1, 2), (ushort)dataBytes); - EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({HeaderSize + dataBytes})"); + EmitDiagnostic($"CommitCurrentChunk[framed]: dataBytes={dataBytes} headerStart={headerStart} _currentChunkStart={_currentChunkStart} position={position} _hasOwnedBuffer={_hasOwnedBuffer} → Advance({HeaderSize + dataBytes})"); - if (_ownedBuffer) FlushOwnedBuffer(buffer, headerStart, HeaderSize + dataBytes); + if (_hasOwnedBuffer) CopyOwnedToPipeWriter(buffer, headerStart, HeaderSize + dataBytes); else _pipeWriter.Advance(HeaderSize + dataBytes); } else { - EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _ownedBuffer={_ownedBuffer} → Advance({dataBytes})"); + EmitDiagnostic($"CommitCurrentChunk[raw]: dataBytes={dataBytes} _currentChunkStart={_currentChunkStart} position={position} _hasOwnedBuffer={_hasOwnedBuffer} → Advance({dataBytes})"); - if (_ownedBuffer) FlushOwnedBuffer(buffer, _currentChunkStart, dataBytes); + if (_hasOwnedBuffer) CopyOwnedToPipeWriter(buffer, _currentChunkStart, dataBytes); else _pipeWriter.Advance(dataBytes); } _committedBytes += dataBytes; // only count data bytes, not framing } + /// + /// Copies the owned-buffer chunk into the PipeWriter (since GetMemory returned non-array-backed + /// memory and the serializer needs byte[]). Does NOT return the buffer to the pool — + /// the reference stays alive across chunks within this serialize + /// lifecycle; releases it exactly once at the end. + /// [MethodImpl(MethodImplOptions.NoInlining)] - private void FlushOwnedBuffer(byte[] buffer, int start, int length) + private void CopyOwnedToPipeWriter(byte[] buffer, int start, int length) { var span = _pipeWriter.GetSpan(length); buffer.AsSpan(start, length).CopyTo(span); _pipeWriter.Advance(length); - - ArrayPool.Shared.Return(buffer); - _ownedBuffer = false; } private void AcquireChunk(int requestSize, out byte[] buffer, out int position, out int bufferEnd) @@ -303,19 +340,32 @@ public struct AsyncPipeWriterOutput : IBinaryOutputBase buffer = segment.Array; position = segment.Offset + headerOffset; bufferEnd = segment.Offset + headerOffset + dataSize; - _ownedBuffer = false; + _hasOwnedBuffer = false; EmitDiagnostic($"AcquireChunk[zc]: segment.Array.Length={segment.Array.Length} segment.Offset={segment.Offset} segment.Count={segment.Count} → buffer[{position}..{bufferEnd}]"); } else { - var owned = ArrayPool.Shared.Rent(totalRequest); - buffer = owned; + // Lazy reuse: keep the same ArrayPool buffer across chunks within this serialize + // lifecycle. Only re-rent if the existing buffer is too small for the next chunk. + // Released exactly once in Flush() — no per-chunk rent/return churn even when this + // (rare) path actually fires. + if (_ownedBuffer == null || _ownedBuffer.Length < totalRequest) + { + if (_ownedBuffer != null) ArrayPool.Shared.Return(_ownedBuffer); + _ownedBuffer = ArrayPool.Shared.Rent(totalRequest); + + EmitDiagnostic($"AcquireChunk[ob-rent]: rented={_ownedBuffer.Length} (totalRequest={totalRequest})"); + } + else + { + EmitDiagnostic($"AcquireChunk[ob-reuse]: reused={_ownedBuffer.Length} (totalRequest={totalRequest})"); + } + + buffer = _ownedBuffer; position = headerOffset; bufferEnd = headerOffset + dataSize; - _ownedBuffer = true; - - EmitDiagnostic($"AcquireChunk[ob]: rented={owned.Length} → buffer[{position}..{bufferEnd}]"); + _hasOwnedBuffer = true; } } }