Implement Stream Adapters (and minor nitpics) (#6583)

This commit is contained in:
Justin Kotalik 2019-01-15 09:00:06 -08:00 committed by GitHub
parent 6a44aca6a2
commit cdd38d70cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1219 additions and 47 deletions

View File

@ -22,7 +22,6 @@ Microsoft.AspNetCore.Http.HttpResponse</Description>
<Reference Include="Microsoft.AspNetCore.Http.Features" />
<Reference Include="Microsoft.Extensions.ActivatorUtilities.Sources" PrivateAssets="All" />
<Reference Include="System.Text.Encodings.Web" />
<Reference Include="System.IO.Pipelines" />
</ItemGroup>
</Project>

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Http.Features
public interface IRequestBodyPipeFeature
{
/// <summary>
/// A <see cref="PipeWriter"/> representing the request body, if any.
/// A <see cref="PipeReader"/> representing the request body, if any.
/// </summary>
PipeReader RequestBodyPipe { get; set; }
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netcoreapp3.0;net461</TargetFrameworks>
<TargetFramework>netcoreapp3.0</TargetFramework>
</PropertyGroup>
<ItemGroup>

View File

@ -7,7 +7,7 @@ using System.Runtime.CompilerServices;
namespace System.IO.Pipelines
{
public sealed class BufferSegment : ReadOnlySequenceSegment<byte>
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
private IMemoryOwner<byte> _memoryOwner;
private BufferSegment _next;

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>ASP.NET Core default HTTP feature implementations.</Description>
@ -19,7 +19,6 @@
<Reference Include="Microsoft.Extensions.ObjectPool" />
<Reference Include="Microsoft.Extensions.Options" />
<Reference Include="Microsoft.Net.Http.Headers" />
<Reference Include="System.IO.Pipelines" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.Http.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]

View File

@ -0,0 +1,240 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
/// <summary>
/// Represents a read-only Stream backed by a PipeReader
/// </summary>
public class ReadOnlyPipeStream : Stream
{
private readonly PipeReader _pipeReader;
private bool _allowSynchronousIO = true;
/// <summary>
/// Creates a new ReadOnlyPipeStream
/// </summary>
/// <param name="pipeReader">The PipeReader to read from.</param>
public ReadOnlyPipeStream(PipeReader pipeReader) :
this(pipeReader, allowSynchronousIO: true)
{
}
/// <summary>
/// Creates a new ReadOnlyPipeStream
/// </summary>
/// <param name="pipeReader">The PipeReader to read from.</param>
/// <param name="allowSynchronousIO">Whether synchronous IO is allowed.</param>
public ReadOnlyPipeStream(PipeReader pipeReader, bool allowSynchronousIO)
{
_allowSynchronousIO = allowSynchronousIO;
_pipeReader = pipeReader;
}
/// <inheritdoc />
public override bool CanSeek => false;
/// <inheritdoc />
public override bool CanRead => true;
/// <inheritdoc />
public override bool CanWrite => false;
/// <inheritdoc />
public override long Length => throw new NotSupportedException();
/// <inheritdoc />
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override int WriteTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
/// <inheritdoc />
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
/// <inheritdoc />
public override void Flush()
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override Task FlushAsync(CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousReadsDisallowed();
}
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
/// <inheritdoc />
public override int EndRead(IAsyncResult asyncResult)
{
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(task2.Result);
}
}, tcs, cancellationToken);
return tcs.Task;
}
/// <inheritdoc />
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
/// <inheritdoc />
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination, cancellationToken);
}
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
var consumed = readableBuffer.End;
var actual = 0;
try
{
if (readableBufferLength != 0)
{
actual = (int)Math.Min(readableBufferLength, buffer.Length);
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
return actual;
}
if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
}
}
}
/// <inheritdoc />
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
if (destination == null)
{
throw new ArgumentNullException(nameof(destination));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize));
}
return CopyToAsyncInternal(destination, cancellationToken);
}
private async Task CopyToAsyncInternal(Stream destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
try
{
if (readableBufferLength != 0)
{
foreach (var memory in readableBuffer)
{
await destination.WriteAsync(memory, cancellationToken);
}
}
if (result.IsCompleted)
{
return;
}
}
finally
{
_pipeReader.AdvanceTo(readableBuffer.End);
}
}
}
}
}

View File

@ -25,7 +25,8 @@ namespace System.IO.Pipelines
private readonly MemoryPool<byte> _pool;
private CancellationTokenSource _internalTokenSource;
private bool _isCompleted;
private bool _isReaderCompleted;
private bool _isWriterCompleted;
private ExceptionDispatchInfo _exceptionInfo;
private BufferSegment _readHead;
@ -182,12 +183,12 @@ namespace System.IO.Pipelines
/// <inheritdoc />
public override void Complete(Exception exception = null)
{
if (_isCompleted)
if (_isReaderCompleted)
{
return;
}
_isCompleted = true;
_isReaderCompleted = true;
if (exception != null)
{
_exceptionInfo = ExceptionDispatchInfo.Capture(exception);
@ -248,6 +249,11 @@ namespace System.IO.Pipelines
_readTail.End += length;
_bufferedBytes += length;
if (length == 0)
{
_isWriterCompleted = true;
}
}
catch (OperationCanceledException)
{
@ -275,7 +281,7 @@ namespace System.IO.Pipelines
private void ThrowIfCompleted()
{
if (_isCompleted)
if (_isReaderCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
@ -357,7 +363,7 @@ namespace System.IO.Pipelines
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsCompletedOrThrow()
{
if (!_isCompleted)
if (!_isWriterCompleted)
{
return false;
}

View File

@ -65,7 +65,7 @@ namespace System.IO.Pipelines
}
/// <summary>
/// Gets the inner stream that is being read from.
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream => _writingStream;

View File

@ -19,5 +19,17 @@ namespace System.IO.Pipelines
public static void ThrowInvalidOperationException_NoDataRead() => throw CreateInvalidOperationException_NoDataRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoDataRead() => new InvalidOperationException("No data has been read into the StreamPipeReader.");
public static void ThrowInvalidOperationException_SynchronousReadsDisallowed() => throw CreateInvalidOperationException_SynchronousReadsDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousReadsDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call ReadAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_SynchronousWritesDisallowed() => throw CreateInvalidOperationException_SynchronousWritesDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousWritesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_SynchronousFlushesDisallowed() => throw CreateInvalidOperationException_SynchronousFlushesDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousFlushesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call FlushAsync or set allowSynchronousIO to true instead.");
}
}

View File

@ -0,0 +1,162 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
/// <summary>
/// Represents a WriteOnlyStream backed by a PipeWriter
/// </summary>
public class WriteOnlyPipeStream : Stream
{
private PipeWriter _pipeWriter;
private bool _allowSynchronousIO = true;
/// <summary>
/// Creates a new WriteOnlyStream
/// </summary>
/// <param name="pipeWriter">The PipeWriter to write to.</param>
public WriteOnlyPipeStream(PipeWriter pipeWriter) :
this(pipeWriter, allowSynchronousIO: true)
{
}
/// <summary>
/// Creates a new WriteOnlyStream
/// </summary>
/// <param name="pipeWriter">The PipeWriter to write to.</param>
/// <param name="allowSynchronousIO">Whether synchronous IO is allowed.</param>
public WriteOnlyPipeStream(PipeWriter pipeWriter, bool allowSynchronousIO)
{
_pipeWriter = pipeWriter;
_allowSynchronousIO = allowSynchronousIO;
}
/// <inheritdoc />
public override bool CanSeek => false;
/// <inheritdoc />
public override bool CanRead => false;
/// <inheritdoc />
public override bool CanWrite => true;
/// <inheritdoc />
public override long Length => throw new NotSupportedException();
/// <inheritdoc />
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override int ReadTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
/// <inheritdoc />
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
/// <inheritdoc />
public override void Flush()
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousFlushesDisallowed();
}
FlushAsync(default).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _pipeWriter.FlushAsync(cancellationToken);
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousWritesDisallowed();
}
WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
/// <inheritdoc />
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
/// <inheritdoc />
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
/// <inheritdoc />
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
await _pipeWriter.WriteAsync(source, cancellationToken);
}
}
}

View File

@ -10,7 +10,7 @@ using Xunit;
namespace System.IO.Pipelines.Tests
{
public class FlushResultCancellationTests : PipeTest
public class FlushResultCancellationTests : StreamPipeTest
{
[Fact]
public async Task FlushAsyncWithNewCancellationTokenNotAffectedByPrevious()

View File

@ -0,0 +1,85 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Text;
using System.Threading.Tasks;
namespace System.IO.Pipelines.Tests
{
public class PipeStreamTest : IDisposable
{
public Stream ReadingStream { get; set; }
public Stream WritingStream { get; set; }
public Pipe Pipe { get; set; }
public PipeReader Reader => Pipe.Reader;
public PipeWriter Writer => Pipe.Writer;
public PipeStreamTest()
{
Pipe = new Pipe();
ReadingStream = new ReadOnlyPipeStream(Reader);
WritingStream = new WriteOnlyPipeStream(Writer);
}
public void Dispose()
{
Writer.Complete();
Reader.Complete();
}
public async Task WriteStringToStreamAsync(string input)
{
await WritingStream.WriteAsync(Encoding.ASCII.GetBytes(input));
}
public async Task WriteStringToPipeAsync(string input)
{
await Writer.WriteAsync(Encoding.ASCII.GetBytes(input));
}
public async Task WriteByteArrayToPipeAsync(byte[] input)
{
await Writer.WriteAsync(input);
}
public async Task<string> ReadFromPipeAsStringAsync()
{
var readResult = await Reader.ReadAsync();
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
Reader.AdvanceTo(readResult.Buffer.End);
return result;
}
public async Task<string> ReadFromStreamAsStringAsync()
{
var memory = new Memory<byte>(new byte[4096]);
var readLength = await ReadingStream.ReadAsync(memory);
var result = Encoding.ASCII.GetString(memory.ToArray(), 0, readLength);
return result;
}
public async Task<byte[]> ReadFromPipeAsByteArrayAsync()
{
var readResult = await Reader.ReadAsync();
var result = readResult.Buffer.ToArray();
Reader.AdvanceTo(readResult.Buffer.End);
return result;
}
public Task<byte[]> ReadFromStreamAsByteArrayAsync(int size)
{
return ReadFromStreamAsByteArrayAsync(size, ReadingStream);
}
public async Task<byte[]> ReadFromStreamAsByteArrayAsync(int size, Stream stream)
{
var memory = new Memory<byte>(new byte[size]);
var readLength = await stream.ReadAsync(memory);
return memory.Slice(0, readLength).ToArray();
}
}
}

View File

@ -12,7 +12,7 @@ using Xunit;
namespace System.IO.Pipelines.Tests
{
public class PipeWriterTests : PipeTest
public class PipeWriterTests : StreamPipeTest
{
[Theory]

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -12,7 +12,7 @@ using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadAsyncCancellationTests : PipeTest
public class ReadAsyncCancellationTests : StreamPipeTest
{
[Fact]
public async Task AdvanceShouldResetStateIfReadCanceled()

View File

@ -0,0 +1,164 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadOnlyPipeStreamTests : PipeStreamTest
{
[Fact]
public void CanSeekFalse()
{
Assert.False(ReadingStream.CanSeek);
}
[Fact]
public void CanReadTrue()
{
Assert.True(ReadingStream.CanRead);
}
[Fact]
public void CanWriteFalse()
{
Assert.False(ReadingStream.CanWrite);
}
[Fact]
public void LengthThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Length);
}
[Fact]
public void PositionThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Position);
Assert.Throws<NotSupportedException>(() => ReadingStream.Position = 1);
}
[Fact]
public void SeekThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Seek(0, SeekOrigin.Begin));
}
[Fact]
public void SetLengthThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.SetLength(1));
}
[Fact]
public void WriteThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Write(new byte[1], 0, 1));
}
[Fact]
public async Task WriteAsyncThrows()
{
await Assert.ThrowsAsync<NotSupportedException>(async () => await ReadingStream.WriteAsync(new byte[1], 0, 1));
}
[Fact]
public void ReadTimeoutThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.WriteTimeout = 1);
Assert.Throws<NotSupportedException>(() => ReadingStream.WriteTimeout);
}
[Fact]
public async Task ReadAsyncWorks()
{
var expected = "Hello World!";
await WriteStringToPipeAsync(expected);
Assert.Equal(expected, await ReadFromStreamAsStringAsync());
}
[Fact]
public async Task BasicLargeRead()
{
var expected = new byte[8000];
await WriteByteArrayToPipeAsync(expected);
Assert.Equal(expected, await ReadFromStreamAsByteArrayAsync(8000));
}
[Fact]
public async Task ReadAsyncIsCalledFromCallingRead()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
stream.Read(new byte[1]);
pipeReader.Verify(m => m.ReadAsync(It.IsAny<CancellationToken>()));
}
[Fact]
public async Task ReadAsyncIsCalledFromCallingReadAsync()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
await stream.ReadAsync(new byte[1]);
pipeReader.Verify(m => m.ReadAsync(It.IsAny<CancellationToken>()));
}
[Fact]
public async Task ReadAsyncCancellationTokenIsPassedIntoReadAsync()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
var token = new CancellationToken();
await stream.ReadAsync(new byte[1], token);
pipeReader.Verify(m => m.ReadAsync(token));
}
[Fact]
public async Task CopyToAsyncWorks()
{
const int expectedSize = 8000;
var expected = new byte[expectedSize];
await WriteByteArrayToPipeAsync(expected);
Writer.Complete();
var destStream = new MemoryStream();
await ReadingStream.CopyToAsync(destStream);
Assert.Equal(expectedSize, destStream.Length);
}
[Fact]
public void BlockSyncIOThrows()
{
var readOnlyPipeStream = new ReadOnlyPipeStream(Reader, allowSynchronousIO: false);
Assert.Throws<InvalidOperationException>(() => readOnlyPipeStream.Read(new byte[0], 0, 0));
}
private async Task<Mock<PipeReader>> SetupMockPipeReader()
{
await WriteByteArrayToPipeAsync(new byte[1]);
var pipeReader = new Mock<PipeReader>();
pipeReader
.Setup(m => m.ReadAsync(It.IsAny<CancellationToken>()))
.Returns(new ValueTask<ReadResult>(new ReadResult(new ReadOnlySequence<byte>(new byte[1]), false, false)));
return pipeReader;
}
}
}

View File

@ -0,0 +1,147 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadingAdaptersInteropTests
{
[Fact]
public async Task CheckBasicReadPipeApi()
{
var pipe = new Pipe();
var readStream = new ReadOnlyPipeStream(pipe.Reader);
var pipeReader = new StreamPipeReader(readStream);
await pipe.Writer.WriteAsync(new byte[10]);
var res = await pipeReader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckNestedPipeApi()
{
var pipe = new Pipe();
var reader = pipe.Reader;
for (var i = 0; i < 3; i++)
{
var readStream = new ReadOnlyPipeStream(reader);
reader = new StreamPipeReader(readStream);
}
await pipe.Writer.WriteAsync(new byte[10]);
var res = await reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckBasicReadStreamApi()
{
var stream = new MemoryStream();
await stream.WriteAsync(new byte[10]);
stream.Position = 0;
var pipeReader = new StreamPipeReader(stream);
var readOnlyStream = new ReadOnlyPipeStream(pipeReader);
var resSize = await readOnlyStream.ReadAsync(new byte[10]);
Assert.Equal(10, resSize);
}
[Fact]
public async Task CheckNestedStreamApi()
{
var stream = new MemoryStream();
await stream.WriteAsync(new byte[10]);
stream.Position = 0;
Stream readOnlyStream = stream;
for (var i = 0; i < 3; i++)
{
var pipeReader = new StreamPipeReader(readOnlyStream);
readOnlyStream = new ReadOnlyPipeStream(pipeReader);
}
var resSize = await readOnlyStream.ReadAsync(new byte[10]);
Assert.Equal(10, resSize);
}
[Fact]
public async Task ReadsCanBeCanceledViaProvidedCancellationToken()
{
var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader());
var pipeReader = new StreamPipeReader(readOnlyStream);
var cts = new CancellationTokenSource(1);
await Task.Delay(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeReader.ReadAsync(cts.Token));
}
[Fact]
public async Task ReadCanBeCancelledViaCancelPendingReadWhenReadIsAsync()
{
var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader());
var pipeReader = new StreamPipeReader(readOnlyStream);
var result = new ReadResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
var readingTask = pipeReader.ReadAsync();
tcs.SetResult(0);
result = await readingTask;
});
await tcs.Task;
pipeReader.CancelPendingRead();
await task;
Assert.True(result.IsCanceled);
}
private class HangingPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
{
throw new NotImplementedException();
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
throw new NotImplementedException();
}
public override void CancelPendingRead()
{
throw new NotImplementedException();
}
public override void Complete(Exception exception = null)
{
throw new NotImplementedException();
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
throw new NotImplementedException();
}
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
await Task.Delay(30000, cancellationToken);
return new ReadResult();
}
public override bool TryRead(out ReadResult result)
{
result = new ReadResult();
return false;
}
}
}
}

View File

@ -11,7 +11,7 @@ using Xunit;
namespace System.IO.Pipelines.Tests
{
public partial class StreamPipeReaderTests : PipeTest
public partial class StreamPipeReaderTests : StreamPipeTest
{
[Fact]
public async Task CanRead()
@ -198,6 +198,8 @@ namespace System.IO.Pipelines.Tests
[Fact]
public async Task ReadAsyncReturnsCanceledInterleaved()
{
Write(new byte[10000]);
// Cancel and Read interleaved to confirm cancellations are independent
for (var i = 0; i < 3; i++)
{
@ -501,22 +503,6 @@ namespace System.IO.Pipelines.Tests
Assert.False(readResult.Buffer.IsSingleSegment);
}
[Fact]
public async Task SetMinimumReadThresholdToMiminumSegmentSizeOnlyGetNewBlockWhenDataIsWritten()
{
CreateReader(minimumReadThreshold: 16);
WriteByteArray(0);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
WriteByteArray(16);
readResult = await Reader.ReadAsync();
Assert.Equal(16, readResult.Buffer.Length);
Assert.True(readResult.Buffer.IsSingleSegment);
}
[Fact]
public void SetMinimumReadThresholdOfZeroThrows()
{
@ -595,6 +581,24 @@ namespace System.IO.Pipelines.Tests
Assert.Equal("c", ReadFromStreamAsString(buffer));
}
[Fact]
public async Task ReadAsyncWithNoDataCompletesReader()
{
var readResult = await Reader.ReadAsync();
Assert.True(readResult.IsCompleted);
}
[Fact]
public async Task ReadAsyncWithEmptyDataCompletesStream()
{
WriteByteArray(0);
var readResult = await Reader.ReadAsync();
Assert.True(readResult.IsCompleted);
}
private async Task<string> ReadFromPipeAsString()
{
var readResult = await Reader.ReadAsync();

View File

@ -1,14 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Text;
namespace System.IO.Pipelines.Tests
{
public abstract class PipeTest : IDisposable
public abstract class StreamPipeTest : IDisposable
{
protected const int MaximumSizeHigh = 65;
@ -20,7 +17,7 @@ namespace System.IO.Pipelines.Tests
public PipeReader Reader { get; set; }
protected PipeTest()
protected StreamPipeTest()
{
MemoryStream = new MemoryStream();
Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool());

View File

@ -12,7 +12,7 @@ using Xunit;
namespace System.IO.Pipelines.Tests
{
public class StreamPipeWriterTests : PipeTest
public class StreamPipeWriterTests : StreamPipeTest
{
[Fact]
public async Task CanWriteAsyncMultipleTimesIntoSameBlock()

View File

@ -0,0 +1,199 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class WriteOnlyPipeStreamTests : PipeStreamTest
{
[Fact]
public void CanSeekFalse()
{
Assert.False(WritingStream.CanSeek);
}
[Fact]
public void CanReadFalse()
{
Assert.False(WritingStream.CanRead);
}
[Fact]
public void CanWriteTrue()
{
Assert.True(WritingStream.CanWrite);
}
[Fact]
public void LengthThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Length);
}
[Fact]
public void PositionThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Position);
Assert.Throws<NotSupportedException>(() => WritingStream.Position = 1);
}
[Fact]
public void SeekThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Seek(0, SeekOrigin.Begin));
}
[Fact]
public void SetLengthThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.SetLength(1));
}
[Fact]
public void ReadThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Read(new byte[1], 0, 1));
}
[Fact]
public async Task ReadAsyncThrows()
{
await Assert.ThrowsAsync<NotSupportedException>(async () => await WritingStream.ReadAsync(new byte[1], 0, 1));
}
[Fact]
public void ReadTimeoutThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.ReadTimeout = 1);
Assert.Throws<NotSupportedException>(() => WritingStream.ReadTimeout);
}
[Fact]
public async Task WriteAsyncWithReadOnlyMemoryWorks()
{
var expected = "Hello World!";
await WriteStringToStreamAsync(expected);
Assert.Equal(expected, await ReadFromPipeAsStringAsync());
}
[Fact]
public async Task WriteAsyncWithArrayWorks()
{
var expected = new byte[1];
await WritingStream.WriteAsync(expected, 0, expected.Length);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public async Task BasicLargeWrite()
{
var expected = new byte[8000];
await WritingStream.WriteAsync(expected);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public void FlushAsyncIsCalledFromCallingFlush()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.Flush();
pipeWriter.Verify(m => m.FlushAsync(default));
}
[Fact]
public async Task FlushAsyncIsCalledFromCallingFlushAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
await stream.FlushAsync();
pipeWriter.Verify(m => m.FlushAsync(default));
}
[Fact]
public async Task FlushAsyncCancellationTokenIsPassedIntoFlushAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
var token = new CancellationToken();
await stream.FlushAsync(token);
pipeWriter.Verify(m => m.FlushAsync(token));
}
[Fact]
public void WriteAsyncIsCalledFromCallingWrite()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.Write(new byte[1]);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task WriteAsyncIsCalledFromCallingWriteAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
await stream.WriteAsync(new byte[1]);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task WriteAsyncCancellationTokenIsPassedIntoWriteAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
var token = new CancellationToken();
await stream.WriteAsync(new byte[1], token);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), token));
}
[Fact]
public void WriteAsyncIsCalledFromBeginWrite()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.BeginWrite(new byte[1], 0, 1, null, this);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task BeginAndEndWriteWork()
{
var expected = new byte[1];
var asyncResult = WritingStream.BeginWrite(expected, 0, 1, null, this);
WritingStream.EndWrite(asyncResult);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public void BlockSyncIOThrows()
{
var writeOnlyPipeStream = new WriteOnlyPipeStream(Writer, allowSynchronousIO: false);
Assert.Throws<InvalidOperationException>(() => writeOnlyPipeStream.Write(new byte[0], 0, 0));
Assert.Throws<InvalidOperationException>(() => writeOnlyPipeStream.Flush());
}
}
}

View File

@ -0,0 +1,155 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class WritingAdaptersInteropTests : PipeStreamTest
{
[Fact]
public async Task CheckBasicWritePipeApi()
{
var pipe = new Pipe();
var writeOnlyStream = new WriteOnlyPipeStream(pipe.Writer);
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
await pipeWriter.WriteAsync(new byte[10]);
var res = await pipe.Reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckNestedPipeApi()
{
var pipe = new Pipe();
var writer = pipe.Writer;
for (var i = 0; i < 3; i++)
{
var writeOnlyStream = new WriteOnlyPipeStream(writer);
writer = new StreamPipeWriter(writeOnlyStream);
}
await writer.WriteAsync(new byte[10]);
var res = await pipe.Reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckBasicWriteStreamApi()
{
var stream = new MemoryStream();
var pipeWriter = new StreamPipeWriter(stream);
var writeOnlyStream = new WriteOnlyPipeStream(pipeWriter);
await writeOnlyStream.WriteAsync(new byte[10]);
stream.Position = 0;
var res = await ReadFromStreamAsByteArrayAsync(10, stream);
Assert.Equal(new byte[10], res);
}
[Fact]
public async Task CheckNestedStreamApi()
{
var stream = new MemoryStream();
Stream writeOnlyStream = stream;
for (var i = 0; i < 3; i++)
{
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
writeOnlyStream = new WriteOnlyPipeStream(pipeWriter);
}
await writeOnlyStream.WriteAsync(new byte[10]);
stream.Position = 0;
var res = await ReadFromStreamAsByteArrayAsync(10, stream);
Assert.Equal(new byte[10], res);
}
[Fact]
public async Task WritesCanBeCanceledViaProvidedCancellationToken()
{
var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter());
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
var cts = new CancellationTokenSource(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeWriter.WriteAsync(new byte[1], cts.Token));
}
[Fact]
public async Task WriteCanBeCanceledViaCancelPendingFlushWhenFlushIsAsync()
{
var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter());
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
var writingTask = pipeWriter.WriteAsync(new byte[1]);
tcs.SetResult(0);
flushResult = await writingTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
pipeWriter.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
}
private class HangingPipeWriter : PipeWriter
{
public override void Advance(int bytes)
{
}
public override void CancelPendingFlush()
{
throw new NotImplementedException();
}
public override void Complete(Exception exception = null)
{
throw new NotImplementedException();
}
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
await Task.Delay(30000, cancellationToken);
return new FlushResult();
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
return new Memory<byte>(new byte[4096]);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
return new Span<byte>(new byte[4096]);
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
throw new NotImplementedException();
}
}
}
}

View File

@ -68,7 +68,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default(CancellationToken), state);
var task = ReadAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));

View File

@ -37,7 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public override void Flush()
{
FlushAsync(default(CancellationToken)).GetAwaiter().GetResult();
FlushAsync(default).GetAwaiter().GetResult();
}
public override Task FlushAsync(CancellationToken cancellationToken)
@ -64,12 +64,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
throw new InvalidOperationException(CoreStrings.SynchronousWritesDisallowed);
}
WriteAsync(buffer, offset, count, default(CancellationToken)).GetAwaiter().GetResult();
WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult();
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default(CancellationToken), state);
var task = WriteAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));

View File

@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
try
{
if (!readableBuffer.IsEmpty)
if (readableBufferLength != 0)
{
// buffer.Length is int
actual = (int)Math.Min(readableBufferLength, buffer.Length);
@ -69,8 +69,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// Make sure we don't double-count bytes on the next read.
_alreadyTimedBytes = readableBufferLength - actual;
var slice = readableBuffer.Slice(0, actual);
consumed = readableBuffer.GetPosition(actual);
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
return actual;
@ -106,7 +106,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
try
{
if (!readableBuffer.IsEmpty)
if (readableBufferLength != 0)
{
foreach (var memory in readableBuffer)
{