Made changes to stream <-> pipe adapters (#7407)

- Use the array pool by default when the shared memory pool is specified for both the StreamPipeReader and StreamPipeWriter
- Support allocating unpooled memory if the StreamPipeWriter is asked for memory outside of the max pool size
This commit is contained in:
David Fowler 2019-02-11 11:15:36 -08:00 committed by GitHub
parent 3276870d6a
commit 476a1827f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 35 deletions

View File

@ -9,7 +9,7 @@ namespace System.IO.Pipelines
{
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
private IMemoryOwner<byte> _memoryOwner;
private object _memoryOwner;
private BufferSegment _next;
private int _end;
@ -46,12 +46,40 @@ namespace System.IO.Pipelines
}
}
public void SetMemory(object memoryOwner)
{
if (memoryOwner is IMemoryOwner<byte> owner)
{
SetMemory(owner);
}
else if (memoryOwner is byte[] array)
{
SetMemory(array);
}
else
{
Debug.Fail("Unexpected memoryOwner");
}
}
public void SetMemory(IMemoryOwner<byte> memoryOwner)
{
_memoryOwner = memoryOwner;
AvailableMemory = _memoryOwner.Memory;
SetUnownedMemory(memoryOwner.Memory);
}
public void SetMemory(byte[] arrayPoolBuffer)
{
_memoryOwner = arrayPoolBuffer;
SetUnownedMemory(arrayPoolBuffer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetUnownedMemory(Memory<byte> memory)
{
AvailableMemory = memory;
RunningIndex = 0;
End = 0;
NextSegment = null;
@ -59,12 +87,21 @@ namespace System.IO.Pipelines
public void ResetMemory()
{
_memoryOwner.Dispose();
if (_memoryOwner is IMemoryOwner<byte> owner)
{
owner.Dispose();
}
else if (_memoryOwner is byte[] array)
{
ArrayPool<byte>.Shared.Return(array);
}
_memoryOwner = null;
AvailableMemory = default;
}
internal IMemoryOwner<byte> MemoryOwner => _memoryOwner;
// Exposed for testing
internal object MemoryOwner => _memoryOwner;
public Memory<byte> AvailableMemory { get; private set; }

View File

@ -15,7 +15,7 @@ namespace System.IO.Pipelines
/// </summary>
public class StreamPipeReader : PipeReader, IDisposable
{
private readonly int _minimumSegmentSize;
private readonly int _bufferSize;
private readonly int _minimumReadThreshold;
private readonly MemoryPool<byte> _pool;
@ -60,9 +60,9 @@ namespace System.IO.Pipelines
throw new ArgumentOutOfRangeException(nameof(options.MinimumReadThreshold));
}
_minimumSegmentSize = options.MinimumSegmentSize;
_minimumReadThreshold = Math.Min(options.MinimumReadThreshold, options.MinimumSegmentSize);
_pool = options.MemoryPool;
_pool = options.MemoryPool == MemoryPool<byte>.Shared ? null : options.MemoryPool;
_bufferSize = _pool == null ? options.MinimumSegmentSize : Math.Min(options.MinimumSegmentSize, _pool.MaxBufferSize);
}
/// <summary>
@ -272,7 +272,7 @@ namespace System.IO.Pipelines
private void ClearCancellationToken()
{
lock(_lock)
lock (_lock)
{
_internalTokenSource = null;
}
@ -328,8 +328,7 @@ namespace System.IO.Pipelines
if (_readHead == null)
{
Debug.Assert(_readTail == null);
_readHead = CreateBufferSegment();
_readHead.SetMemory(_pool.Rent(GetSegmentSize()));
_readHead = AllocateSegment();
_readTail = _readHead;
}
else if (_readTail.WritableBytes < _minimumReadThreshold)
@ -340,18 +339,25 @@ namespace System.IO.Pipelines
private void CreateNewTailSegment()
{
var nextSegment = CreateBufferSegment();
nextSegment.SetMemory(_pool.Rent(GetSegmentSize()));
BufferSegment nextSegment = AllocateSegment();
_readTail.SetNext(nextSegment);
_readTail = nextSegment;
}
private int GetSegmentSize() => Math.Min(_pool.MaxBufferSize, _minimumSegmentSize);
private BufferSegment CreateBufferSegment()
private BufferSegment AllocateSegment()
{
// TODO this can pool buffer segment objects
return new BufferSegment();
var nextSegment = new BufferSegment();
if (_pool is null)
{
nextSegment.SetMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
}
else
{
nextSegment.SetMemory(_pool.Rent(_bufferSize));
}
return nextSegment;
}
private void Cancel()

View File

@ -18,7 +18,7 @@ namespace System.IO.Pipelines
private List<CompletedBuffer> _completedSegments;
private Memory<byte> _currentSegment;
private IMemoryOwner<byte> _currentSegmentOwner;
private object _currentSegmentOwner;
private MemoryPool<byte> _pool;
private int _position;
@ -53,7 +53,7 @@ namespace System.IO.Pipelines
{
_minimumSegmentSize = minimumSegmentSize;
InnerStream = writingStream;
_pool = pool ?? MemoryPool<byte>.Shared;
_pool = pool == MemoryPool<byte>.Shared ? null : pool;
}
/// <summary>
@ -162,7 +162,7 @@ namespace System.IO.Pipelines
{
// Write all completed segments and whatever remains in the current segment
// and flush the result.
CancellationTokenRegistration reg = new CancellationTokenRegistration();
var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.Register(state => ((StreamPipeWriter)state).Cancel(), this);
@ -260,13 +260,27 @@ namespace System.IO.Pipelines
// Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
// ignore any empty space in it.
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _position));
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
}
if (_pool is null)
{
_currentSegment = ArrayPool<byte>.Shared.Rent(Math.Max(sizeHint, _minimumSegmentSize));
_currentSegmentOwner = _currentSegment;
}
else if (sizeHint <= _pool.MaxBufferSize)
{
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
// Also, the size cannot be larger than the MaxBufferSize of the MemoryPool
var owner = _pool.Rent(Math.Clamp(sizeHint, _minimumSegmentSize, _pool.MaxBufferSize));
_currentSegment = owner.Memory;
_currentSegmentOwner = owner;
}
else
{
_currentSegment = new byte[sizeHint];
}
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
// Also, the size cannot be larger than the MaxBufferSize of the MemoryPool
_currentSegmentOwner = _pool.Rent(Math.Clamp(sizeHint, _minimumSegmentSize, _pool.MaxBufferSize));
_currentSegment = _currentSegmentOwner.Memory;
_position = 0;
}
@ -289,7 +303,19 @@ namespace System.IO.Pipelines
}
}
_currentSegmentOwner?.Dispose();
DisposeOwner(_currentSegmentOwner);
}
private static void DisposeOwner(object owner)
{
if (owner is IMemoryOwner<byte> memoryOwner)
{
memoryOwner.Dispose();
}
else if (owner is byte[] array)
{
ArrayPool<byte>.Shared.Return(array);
}
}
/// <summary>
@ -297,23 +323,24 @@ namespace System.IO.Pipelines
/// </summary>
private readonly struct CompletedBuffer
{
private readonly object _memoryOwner;
public Memory<byte> Buffer { get; }
public int Length { get; }
public ReadOnlySpan<byte> Span => Buffer.Span;
private readonly IMemoryOwner<byte> _memoryOwner;
public CompletedBuffer(IMemoryOwner<byte> buffer, int length)
public CompletedBuffer(object owner, Memory<byte> buffer, int length)
{
Buffer = buffer.Memory;
_memoryOwner = owner;
Buffer = buffer;
Length = length;
_memoryOwner = buffer;
}
public void Return()
{
_memoryOwner.Dispose();
DisposeOwner(_memoryOwner);
}
}
}

View File

@ -4,6 +4,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -372,6 +373,30 @@ namespace System.IO.Pipelines.Tests
Assert.True(IsTaskWithResult(task));
}
[Fact]
public async Task ArrayPoolUsedByDefault()
{
WriteByteArray(20);
var reader = new StreamPipeReader(Stream);
var result = await reader.ReadAsync();
SequenceMarshal.TryGetReadOnlySequenceSegment(
result.Buffer,
out var startSegment,
out var startIndex,
out var endSegment,
out var endIndex);
var start = (BufferSegment)startSegment;
var end = (BufferSegment)endSegment;
Assert.Same(start, end);
Assert.IsType<byte[]>(start.MemoryOwner);
reader.AdvanceTo(result.Buffer.End);
reader.Complete();
}
[Fact]
public void CancelledReadAsyncReturnsTaskWithValue()
{
@ -593,7 +618,7 @@ namespace System.IO.Pipelines.Tests
public async Task ReadAsyncWithEmptyDataCompletesStream()
{
WriteByteArray(0);
var readResult = await Reader.ReadAsync();
Assert.True(readResult.IsCompleted);

View File

@ -17,17 +17,21 @@ namespace System.IO.Pipelines.Tests
public PipeReader Reader { get; set; }
public TestMemoryPool Pool { get; set; }
protected StreamPipeTest()
{
Pool = new TestMemoryPool();
Stream = new MemoryStream();
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool());
Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool()));
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, Pool);
Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, Pool));
}
public void Dispose()
{
Writer.Complete();
Reader.Complete();
Pool.Dispose();
}
public byte[] Read()

View File

@ -320,6 +320,24 @@ namespace System.IO.Pipelines.Tests
Writer.Complete(new Exception());
}
[Fact]
public void GetMemorySameAsTheMaxPoolSizeUsesThePool()
{
var memory = Writer.GetMemory(Pool.MaxBufferSize);
Assert.Equal(Pool.MaxBufferSize, memory.Length);
Assert.Equal(1, Pool.GetRentCount());
}
[Fact]
public void GetMemoryBiggerThanPoolSizeAllocatesUnpooledArray()
{
var memory = Writer.GetMemory(Pool.MaxBufferSize + 1);
Assert.Equal(Pool.MaxBufferSize + 1, memory.Length);
Assert.Equal(0, Pool.GetRentCount());
}
[Fact]
public void CallComplete_GetMemoryThrows()
{