From 476a1827f4c9921df2747339676fc60907d8d271 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Mon, 11 Feb 2019 11:15:36 -0800 Subject: [PATCH] Made changes to stream <-> pipe adapters (#7407) - Use the array pool by default when the shared memory pool is specified for both the StreamPipeReader and StreamPipeWriter - Support allocating unpooled memory if the StreamPipeWriter is asked for memory outside of the max pool size --- src/Http/Http/src/BufferSegment.cs | 45 ++++++++++++++-- src/Http/Http/src/StreamPipeReader.cs | 32 +++++++----- src/Http/Http/src/StreamPipeWriter.cs | 57 +++++++++++++++------ src/Http/Http/test/StreamPipeReaderTests.cs | 27 +++++++++- src/Http/Http/test/StreamPipeTest.cs | 8 ++- src/Http/Http/test/StreamPipeWriterTests.cs | 18 +++++++ 6 files changed, 152 insertions(+), 35 deletions(-) diff --git a/src/Http/Http/src/BufferSegment.cs b/src/Http/Http/src/BufferSegment.cs index ba037babc9..3b04a1b1ec 100644 --- a/src/Http/Http/src/BufferSegment.cs +++ b/src/Http/Http/src/BufferSegment.cs @@ -9,7 +9,7 @@ namespace System.IO.Pipelines { internal sealed class BufferSegment : ReadOnlySequenceSegment { - private IMemoryOwner _memoryOwner; + private object _memoryOwner; private BufferSegment _next; private int _end; @@ -46,12 +46,40 @@ namespace System.IO.Pipelines } } + public void SetMemory(object memoryOwner) + { + if (memoryOwner is IMemoryOwner owner) + { + SetMemory(owner); + } + else if (memoryOwner is byte[] array) + { + SetMemory(array); + } + else + { + Debug.Fail("Unexpected memoryOwner"); + } + } + public void SetMemory(IMemoryOwner memoryOwner) { _memoryOwner = memoryOwner; - AvailableMemory = _memoryOwner.Memory; + SetUnownedMemory(memoryOwner.Memory); + } + public void SetMemory(byte[] arrayPoolBuffer) + { + _memoryOwner = arrayPoolBuffer; + + SetUnownedMemory(arrayPoolBuffer); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void SetUnownedMemory(Memory memory) + { + AvailableMemory = memory; RunningIndex = 0; End = 0; NextSegment = null; @@ -59,12 +87,21 @@ namespace System.IO.Pipelines public void ResetMemory() { - _memoryOwner.Dispose(); + if (_memoryOwner is IMemoryOwner owner) + { + owner.Dispose(); + } + else if (_memoryOwner is byte[] array) + { + ArrayPool.Shared.Return(array); + } + _memoryOwner = null; AvailableMemory = default; } - internal IMemoryOwner MemoryOwner => _memoryOwner; + // Exposed for testing + internal object MemoryOwner => _memoryOwner; public Memory AvailableMemory { get; private set; } diff --git a/src/Http/Http/src/StreamPipeReader.cs b/src/Http/Http/src/StreamPipeReader.cs index 36e4f0e21a..db19ac8313 100644 --- a/src/Http/Http/src/StreamPipeReader.cs +++ b/src/Http/Http/src/StreamPipeReader.cs @@ -15,7 +15,7 @@ namespace System.IO.Pipelines /// public class StreamPipeReader : PipeReader, IDisposable { - private readonly int _minimumSegmentSize; + private readonly int _bufferSize; private readonly int _minimumReadThreshold; private readonly MemoryPool _pool; @@ -60,9 +60,9 @@ namespace System.IO.Pipelines throw new ArgumentOutOfRangeException(nameof(options.MinimumReadThreshold)); } - _minimumSegmentSize = options.MinimumSegmentSize; _minimumReadThreshold = Math.Min(options.MinimumReadThreshold, options.MinimumSegmentSize); - _pool = options.MemoryPool; + _pool = options.MemoryPool == MemoryPool.Shared ? null : options.MemoryPool; + _bufferSize = _pool == null ? options.MinimumSegmentSize : Math.Min(options.MinimumSegmentSize, _pool.MaxBufferSize); } /// @@ -272,7 +272,7 @@ namespace System.IO.Pipelines private void ClearCancellationToken() { - lock(_lock) + lock (_lock) { _internalTokenSource = null; } @@ -328,8 +328,7 @@ namespace System.IO.Pipelines if (_readHead == null) { Debug.Assert(_readTail == null); - _readHead = CreateBufferSegment(); - _readHead.SetMemory(_pool.Rent(GetSegmentSize())); + _readHead = AllocateSegment(); _readTail = _readHead; } else if (_readTail.WritableBytes < _minimumReadThreshold) @@ -340,18 +339,25 @@ namespace System.IO.Pipelines private void CreateNewTailSegment() { - var nextSegment = CreateBufferSegment(); - nextSegment.SetMemory(_pool.Rent(GetSegmentSize())); + BufferSegment nextSegment = AllocateSegment(); _readTail.SetNext(nextSegment); _readTail = nextSegment; } - private int GetSegmentSize() => Math.Min(_pool.MaxBufferSize, _minimumSegmentSize); - - private BufferSegment CreateBufferSegment() + private BufferSegment AllocateSegment() { - // TODO this can pool buffer segment objects - return new BufferSegment(); + var nextSegment = new BufferSegment(); + + if (_pool is null) + { + nextSegment.SetMemory(ArrayPool.Shared.Rent(_bufferSize)); + } + else + { + nextSegment.SetMemory(_pool.Rent(_bufferSize)); + } + + return nextSegment; } private void Cancel() diff --git a/src/Http/Http/src/StreamPipeWriter.cs b/src/Http/Http/src/StreamPipeWriter.cs index 549f2b521a..087a2b53b6 100644 --- a/src/Http/Http/src/StreamPipeWriter.cs +++ b/src/Http/Http/src/StreamPipeWriter.cs @@ -18,7 +18,7 @@ namespace System.IO.Pipelines private List _completedSegments; private Memory _currentSegment; - private IMemoryOwner _currentSegmentOwner; + private object _currentSegmentOwner; private MemoryPool _pool; private int _position; @@ -53,7 +53,7 @@ namespace System.IO.Pipelines { _minimumSegmentSize = minimumSegmentSize; InnerStream = writingStream; - _pool = pool ?? MemoryPool.Shared; + _pool = pool == MemoryPool.Shared ? null : pool; } /// @@ -162,7 +162,7 @@ namespace System.IO.Pipelines { // Write all completed segments and whatever remains in the current segment // and flush the result. - CancellationTokenRegistration reg = new CancellationTokenRegistration(); + var reg = new CancellationTokenRegistration(); if (cancellationToken.CanBeCanceled) { reg = cancellationToken.Register(state => ((StreamPipeWriter)state).Cancel(), this); @@ -260,13 +260,27 @@ namespace System.IO.Pipelines // Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when // GetMemory was called. In that case we'll take the current segment and call it "completed", but need to // ignore any empty space in it. - _completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _position)); + _completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position)); + } + + if (_pool is null) + { + _currentSegment = ArrayPool.Shared.Rent(Math.Max(sizeHint, _minimumSegmentSize)); + _currentSegmentOwner = _currentSegment; + } + else if (sizeHint <= _pool.MaxBufferSize) + { + // Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment. + // Also, the size cannot be larger than the MaxBufferSize of the MemoryPool + var owner = _pool.Rent(Math.Clamp(sizeHint, _minimumSegmentSize, _pool.MaxBufferSize)); + _currentSegment = owner.Memory; + _currentSegmentOwner = owner; + } + else + { + _currentSegment = new byte[sizeHint]; } - // Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment. - // Also, the size cannot be larger than the MaxBufferSize of the MemoryPool - _currentSegmentOwner = _pool.Rent(Math.Clamp(sizeHint, _minimumSegmentSize, _pool.MaxBufferSize)); - _currentSegment = _currentSegmentOwner.Memory; _position = 0; } @@ -289,7 +303,19 @@ namespace System.IO.Pipelines } } - _currentSegmentOwner?.Dispose(); + DisposeOwner(_currentSegmentOwner); + } + + private static void DisposeOwner(object owner) + { + if (owner is IMemoryOwner memoryOwner) + { + memoryOwner.Dispose(); + } + else if (owner is byte[] array) + { + ArrayPool.Shared.Return(array); + } } /// @@ -297,23 +323,24 @@ namespace System.IO.Pipelines /// private readonly struct CompletedBuffer { + private readonly object _memoryOwner; + public Memory Buffer { get; } public int Length { get; } public ReadOnlySpan Span => Buffer.Span; - private readonly IMemoryOwner _memoryOwner; - - public CompletedBuffer(IMemoryOwner buffer, int length) + public CompletedBuffer(object owner, Memory buffer, int length) { - Buffer = buffer.Memory; + _memoryOwner = owner; + + Buffer = buffer; Length = length; - _memoryOwner = buffer; } public void Return() { - _memoryOwner.Dispose(); + DisposeOwner(_memoryOwner); } } } diff --git a/src/Http/Http/test/StreamPipeReaderTests.cs b/src/Http/Http/test/StreamPipeReaderTests.cs index 8399060c0d..dc68506a05 100644 --- a/src/Http/Http/test/StreamPipeReaderTests.cs +++ b/src/Http/Http/test/StreamPipeReaderTests.cs @@ -4,6 +4,7 @@ using System.Buffers; using System.Collections.Generic; using System.Linq; +using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -372,6 +373,30 @@ namespace System.IO.Pipelines.Tests Assert.True(IsTaskWithResult(task)); } + [Fact] + public async Task ArrayPoolUsedByDefault() + { + WriteByteArray(20); + var reader = new StreamPipeReader(Stream); + var result = await reader.ReadAsync(); + + SequenceMarshal.TryGetReadOnlySequenceSegment( + result.Buffer, + out var startSegment, + out var startIndex, + out var endSegment, + out var endIndex); + + var start = (BufferSegment)startSegment; + var end = (BufferSegment)endSegment; + + Assert.Same(start, end); + Assert.IsType(start.MemoryOwner); + + reader.AdvanceTo(result.Buffer.End); + reader.Complete(); + } + [Fact] public void CancelledReadAsyncReturnsTaskWithValue() { @@ -593,7 +618,7 @@ namespace System.IO.Pipelines.Tests public async Task ReadAsyncWithEmptyDataCompletesStream() { WriteByteArray(0); - + var readResult = await Reader.ReadAsync(); Assert.True(readResult.IsCompleted); diff --git a/src/Http/Http/test/StreamPipeTest.cs b/src/Http/Http/test/StreamPipeTest.cs index 86b554c403..f75c5053f1 100644 --- a/src/Http/Http/test/StreamPipeTest.cs +++ b/src/Http/Http/test/StreamPipeTest.cs @@ -17,17 +17,21 @@ namespace System.IO.Pipelines.Tests public PipeReader Reader { get; set; } + public TestMemoryPool Pool { get; set; } + protected StreamPipeTest() { + Pool = new TestMemoryPool(); Stream = new MemoryStream(); - Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool()); - Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool())); + Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, Pool); + Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, Pool)); } public void Dispose() { Writer.Complete(); Reader.Complete(); + Pool.Dispose(); } public byte[] Read() diff --git a/src/Http/Http/test/StreamPipeWriterTests.cs b/src/Http/Http/test/StreamPipeWriterTests.cs index 6027359467..43c8166e76 100644 --- a/src/Http/Http/test/StreamPipeWriterTests.cs +++ b/src/Http/Http/test/StreamPipeWriterTests.cs @@ -320,6 +320,24 @@ namespace System.IO.Pipelines.Tests Writer.Complete(new Exception()); } + [Fact] + public void GetMemorySameAsTheMaxPoolSizeUsesThePool() + { + var memory = Writer.GetMemory(Pool.MaxBufferSize); + + Assert.Equal(Pool.MaxBufferSize, memory.Length); + Assert.Equal(1, Pool.GetRentCount()); + } + + [Fact] + public void GetMemoryBiggerThanPoolSizeAllocatesUnpooledArray() + { + var memory = Writer.GetMemory(Pool.MaxBufferSize + 1); + + Assert.Equal(Pool.MaxBufferSize + 1, memory.Length); + Assert.Equal(0, Pool.GetRentCount()); + } + [Fact] public void CallComplete_GetMemoryThrows() {