diff --git a/src/Http/Http.Abstractions/src/Microsoft.AspNetCore.Http.Abstractions.csproj b/src/Http/Http.Abstractions/src/Microsoft.AspNetCore.Http.Abstractions.csproj index 380a4bd959..2f17e52019 100644 --- a/src/Http/Http.Abstractions/src/Microsoft.AspNetCore.Http.Abstractions.csproj +++ b/src/Http/Http.Abstractions/src/Microsoft.AspNetCore.Http.Abstractions.csproj @@ -22,7 +22,6 @@ Microsoft.AspNetCore.Http.HttpResponse - diff --git a/src/Http/Http.Features/src/IRequestBodyPipeFeature.cs b/src/Http/Http.Features/src/IRequestBodyPipeFeature.cs index 0c996ff691..503c205276 100644 --- a/src/Http/Http.Features/src/IRequestBodyPipeFeature.cs +++ b/src/Http/Http.Features/src/IRequestBodyPipeFeature.cs @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Http.Features public interface IRequestBodyPipeFeature { /// - /// A representing the request body, if any. + /// A representing the request body, if any. /// PipeReader RequestBodyPipe { get; set; } } diff --git a/src/Http/Http.Features/test/Microsoft.AspNetCore.Http.Features.Tests.csproj b/src/Http/Http.Features/test/Microsoft.AspNetCore.Http.Features.Tests.csproj index c83d286a00..6a80fe588a 100644 --- a/src/Http/Http.Features/test/Microsoft.AspNetCore.Http.Features.Tests.csproj +++ b/src/Http/Http.Features/test/Microsoft.AspNetCore.Http.Features.Tests.csproj @@ -1,7 +1,7 @@ - netcoreapp3.0;net461 + netcoreapp3.0 diff --git a/src/Http/Http/src/BufferSegment.cs b/src/Http/Http/src/BufferSegment.cs index f0dcdc5077..735a4a39e0 100644 --- a/src/Http/Http/src/BufferSegment.cs +++ b/src/Http/Http/src/BufferSegment.cs @@ -7,7 +7,7 @@ using System.Runtime.CompilerServices; namespace System.IO.Pipelines { - public sealed class BufferSegment : ReadOnlySequenceSegment + internal sealed class BufferSegment : ReadOnlySequenceSegment { private IMemoryOwner _memoryOwner; private BufferSegment _next; diff --git a/src/Http/Http/src/Microsoft.AspNetCore.Http.csproj b/src/Http/Http/src/Microsoft.AspNetCore.Http.csproj index 1575488b80..d091f7d690 100644 --- a/src/Http/Http/src/Microsoft.AspNetCore.Http.csproj +++ b/src/Http/Http/src/Microsoft.AspNetCore.Http.csproj @@ -1,4 +1,4 @@ - + ASP.NET Core default HTTP feature implementations. @@ -19,7 +19,6 @@ - diff --git a/src/Http/Http/src/Properties/AssemblyInfo.cs b/src/Http/Http/src/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..2b8d94f4a5 --- /dev/null +++ b/src/Http/Http/src/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Microsoft.AspNetCore.Http.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")] diff --git a/src/Http/Http/src/ReadOnlyPipeStream.cs b/src/Http/Http/src/ReadOnlyPipeStream.cs new file mode 100644 index 0000000000..7585947d2c --- /dev/null +++ b/src/Http/Http/src/ReadOnlyPipeStream.cs @@ -0,0 +1,240 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + /// + /// Represents a read-only Stream backed by a PipeReader + /// + public class ReadOnlyPipeStream : Stream + { + private readonly PipeReader _pipeReader; + private bool _allowSynchronousIO = true; + + /// + /// Creates a new ReadOnlyPipeStream + /// + /// The PipeReader to read from. + public ReadOnlyPipeStream(PipeReader pipeReader) : + this(pipeReader, allowSynchronousIO: true) + { + } + + /// + /// Creates a new ReadOnlyPipeStream + /// + /// The PipeReader to read from. + /// Whether synchronous IO is allowed. + public ReadOnlyPipeStream(PipeReader pipeReader, bool allowSynchronousIO) + { + _allowSynchronousIO = allowSynchronousIO; + _pipeReader = pipeReader; + } + + /// + public override bool CanSeek => false; + + /// + public override bool CanRead => true; + + /// + public override bool CanWrite => false; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override int WriteTimeout + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => throw new NotSupportedException(); + + /// + public override void Flush() + { + throw new NotSupportedException(); + } + + /// + public override Task FlushAsync(CancellationToken cancellationToken) + { + throw new NotSupportedException(); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + /// + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + if (!_allowSynchronousIO) + { + ThrowHelper.ThrowInvalidOperationException_SynchronousReadsDisallowed(); + } + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + /// + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = ReadAsync(buffer, offset, count, default, state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + /// + public override int EndRead(IAsyncResult asyncResult) + { + return ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = ReadAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(task2.Result); + } + }, tcs, cancellationToken); + return tcs.Task; + } + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + /// + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination, cancellationToken); + } + + private async ValueTask ReadAsyncInternal(Memory buffer, CancellationToken cancellationToken) + { + while (true) + { + var result = await _pipeReader.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + var readableBufferLength = readableBuffer.Length; + + var consumed = readableBuffer.End; + var actual = 0; + try + { + if (readableBufferLength != 0) + { + actual = (int)Math.Min(readableBufferLength, buffer.Length); + + var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual); + consumed = slice.End; + slice.CopyTo(buffer.Span); + + return actual; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _pipeReader.AdvanceTo(consumed); + } + } + } + + /// + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + if (destination == null) + { + throw new ArgumentNullException(nameof(destination)); + } + + if (bufferSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(bufferSize)); + } + + return CopyToAsyncInternal(destination, cancellationToken); + } + + private async Task CopyToAsyncInternal(Stream destination, CancellationToken cancellationToken) + { + while (true) + { + var result = await _pipeReader.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + var readableBufferLength = readableBuffer.Length; + + try + { + if (readableBufferLength != 0) + { + foreach (var memory in readableBuffer) + { + await destination.WriteAsync(memory, cancellationToken); + } + } + + if (result.IsCompleted) + { + return; + } + } + finally + { + _pipeReader.AdvanceTo(readableBuffer.End); + } + } + } + } +} diff --git a/src/Http/Http/src/StreamPipeReader.cs b/src/Http/Http/src/StreamPipeReader.cs index 9d9e64caca..09295f2807 100644 --- a/src/Http/Http/src/StreamPipeReader.cs +++ b/src/Http/Http/src/StreamPipeReader.cs @@ -25,7 +25,8 @@ namespace System.IO.Pipelines private readonly MemoryPool _pool; private CancellationTokenSource _internalTokenSource; - private bool _isCompleted; + private bool _isReaderCompleted; + private bool _isWriterCompleted; private ExceptionDispatchInfo _exceptionInfo; private BufferSegment _readHead; @@ -182,12 +183,12 @@ namespace System.IO.Pipelines /// public override void Complete(Exception exception = null) { - if (_isCompleted) + if (_isReaderCompleted) { return; } - _isCompleted = true; + _isReaderCompleted = true; if (exception != null) { _exceptionInfo = ExceptionDispatchInfo.Capture(exception); @@ -248,6 +249,11 @@ namespace System.IO.Pipelines _readTail.End += length; _bufferedBytes += length; + + if (length == 0) + { + _isWriterCompleted = true; + } } catch (OperationCanceledException) { @@ -275,7 +281,7 @@ namespace System.IO.Pipelines private void ThrowIfCompleted() { - if (_isCompleted) + if (_isReaderCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } @@ -357,7 +363,7 @@ namespace System.IO.Pipelines [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool IsCompletedOrThrow() { - if (!_isCompleted) + if (!_isWriterCompleted) { return false; } diff --git a/src/Http/Http/src/StreamPipeWriter.cs b/src/Http/Http/src/StreamPipeWriter.cs index 6926f1e9b9..adfafe4455 100644 --- a/src/Http/Http/src/StreamPipeWriter.cs +++ b/src/Http/Http/src/StreamPipeWriter.cs @@ -65,7 +65,7 @@ namespace System.IO.Pipelines } /// - /// Gets the inner stream that is being read from. + /// Gets the inner stream that is being written to. /// public Stream InnerStream => _writingStream; diff --git a/src/Http/Http/src/ThrowHelper.cs b/src/Http/Http/src/ThrowHelper.cs index e671d9f6ee..1ae116b646 100644 --- a/src/Http/Http/src/ThrowHelper.cs +++ b/src/Http/Http/src/ThrowHelper.cs @@ -19,5 +19,17 @@ namespace System.IO.Pipelines public static void ThrowInvalidOperationException_NoDataRead() => throw CreateInvalidOperationException_NoDataRead(); [MethodImpl(MethodImplOptions.NoInlining)] public static Exception CreateInvalidOperationException_NoDataRead() => new InvalidOperationException("No data has been read into the StreamPipeReader."); + + public static void ThrowInvalidOperationException_SynchronousReadsDisallowed() => throw CreateInvalidOperationException_SynchronousReadsDisallowed(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateInvalidOperationException_SynchronousReadsDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call ReadAsync or set allowSynchronousIO to true instead."); + + public static void ThrowInvalidOperationException_SynchronousWritesDisallowed() => throw CreateInvalidOperationException_SynchronousWritesDisallowed(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateInvalidOperationException_SynchronousWritesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set allowSynchronousIO to true instead."); + + public static void ThrowInvalidOperationException_SynchronousFlushesDisallowed() => throw CreateInvalidOperationException_SynchronousFlushesDisallowed(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateInvalidOperationException_SynchronousFlushesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call FlushAsync or set allowSynchronousIO to true instead."); } } diff --git a/src/Http/Http/src/WriteOnlyPipeStream.cs b/src/Http/Http/src/WriteOnlyPipeStream.cs new file mode 100644 index 0000000000..0f5121cc2f --- /dev/null +++ b/src/Http/Http/src/WriteOnlyPipeStream.cs @@ -0,0 +1,162 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + /// + /// Represents a WriteOnlyStream backed by a PipeWriter + /// + public class WriteOnlyPipeStream : Stream + { + private PipeWriter _pipeWriter; + private bool _allowSynchronousIO = true; + + /// + /// Creates a new WriteOnlyStream + /// + /// The PipeWriter to write to. + public WriteOnlyPipeStream(PipeWriter pipeWriter) : + this(pipeWriter, allowSynchronousIO: true) + { + } + + /// + /// Creates a new WriteOnlyStream + /// + /// The PipeWriter to write to. + /// Whether synchronous IO is allowed. + public WriteOnlyPipeStream(PipeWriter pipeWriter, bool allowSynchronousIO) + { + _pipeWriter = pipeWriter; + _allowSynchronousIO = allowSynchronousIO; + } + + /// + public override bool CanSeek => false; + + /// + public override bool CanRead => false; + + /// + public override bool CanWrite => true; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override int ReadTimeout + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => throw new NotSupportedException(); + + /// + public override void Flush() + { + if (!_allowSynchronousIO) + { + ThrowHelper.ThrowInvalidOperationException_SynchronousFlushesDisallowed(); + } + + FlushAsync(default).GetAwaiter().GetResult(); + } + + /// + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await _pipeWriter.FlushAsync(cancellationToken); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + /// + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + if (!_allowSynchronousIO) + { + ThrowHelper.ThrowInvalidOperationException_SynchronousWritesDisallowed(); + } + WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult(); + } + + /// + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = WriteAsync(buffer, offset, count, default, state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + /// + public override void EndWrite(IAsyncResult asyncResult) + { + ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = WriteAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(null); + } + }, tcs, cancellationToken); + return tcs.Task; + } + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); + } + + /// + public override async ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + await _pipeWriter.WriteAsync(source, cancellationToken); + } + } +} diff --git a/src/Http/Http/test/FlushResultCancellationTests.cs b/src/Http/Http/test/FlushResultCancellationTests.cs index e1f2476682..86dfac17c2 100644 --- a/src/Http/Http/test/FlushResultCancellationTests.cs +++ b/src/Http/Http/test/FlushResultCancellationTests.cs @@ -10,7 +10,7 @@ using Xunit; namespace System.IO.Pipelines.Tests { - public class FlushResultCancellationTests : PipeTest + public class FlushResultCancellationTests : StreamPipeTest { [Fact] public async Task FlushAsyncWithNewCancellationTokenNotAffectedByPrevious() diff --git a/src/Http/Http/test/PipeStreamTest.cs b/src/Http/Http/test/PipeStreamTest.cs new file mode 100644 index 0000000000..94b3ded7be --- /dev/null +++ b/src/Http/Http/test/PipeStreamTest.cs @@ -0,0 +1,85 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Text; +using System.Threading.Tasks; + +namespace System.IO.Pipelines.Tests +{ + public class PipeStreamTest : IDisposable + { + public Stream ReadingStream { get; set; } + public Stream WritingStream { get; set; } + + public Pipe Pipe { get; set; } + + public PipeReader Reader => Pipe.Reader; + + public PipeWriter Writer => Pipe.Writer; + + public PipeStreamTest() + { + Pipe = new Pipe(); + ReadingStream = new ReadOnlyPipeStream(Reader); + WritingStream = new WriteOnlyPipeStream(Writer); + } + + public void Dispose() + { + Writer.Complete(); + Reader.Complete(); + } + + public async Task WriteStringToStreamAsync(string input) + { + await WritingStream.WriteAsync(Encoding.ASCII.GetBytes(input)); + } + + public async Task WriteStringToPipeAsync(string input) + { + await Writer.WriteAsync(Encoding.ASCII.GetBytes(input)); + } + + public async Task WriteByteArrayToPipeAsync(byte[] input) + { + await Writer.WriteAsync(input); + } + + public async Task ReadFromPipeAsStringAsync() + { + var readResult = await Reader.ReadAsync(); + var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray()); + Reader.AdvanceTo(readResult.Buffer.End); + return result; + } + + public async Task ReadFromStreamAsStringAsync() + { + var memory = new Memory(new byte[4096]); + var readLength = await ReadingStream.ReadAsync(memory); + var result = Encoding.ASCII.GetString(memory.ToArray(), 0, readLength); + return result; + } + + public async Task ReadFromPipeAsByteArrayAsync() + { + var readResult = await Reader.ReadAsync(); + var result = readResult.Buffer.ToArray(); + Reader.AdvanceTo(readResult.Buffer.End); + return result; + } + + public Task ReadFromStreamAsByteArrayAsync(int size) + { + return ReadFromStreamAsByteArrayAsync(size, ReadingStream); + } + + public async Task ReadFromStreamAsByteArrayAsync(int size, Stream stream) + { + var memory = new Memory(new byte[size]); + var readLength = await stream.ReadAsync(memory); + return memory.Slice(0, readLength).ToArray(); + } + } +} diff --git a/src/Http/Http/test/PipeWriterTests.cs b/src/Http/Http/test/PipeWriterTests.cs index 7edffabf2b..3343abd6f3 100644 --- a/src/Http/Http/test/PipeWriterTests.cs +++ b/src/Http/Http/test/PipeWriterTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace System.IO.Pipelines.Tests { - public class PipeWriterTests : PipeTest + public class PipeWriterTests : StreamPipeTest { [Theory] diff --git a/src/Http/Http/test/ReadAsyncCancellationTests.cs b/src/Http/Http/test/ReadAsyncCancellationTests.cs index 72c80a7058..84d183c512 100644 --- a/src/Http/Http/test/ReadAsyncCancellationTests.cs +++ b/src/Http/Http/test/ReadAsyncCancellationTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -12,7 +12,7 @@ using Xunit; namespace System.IO.Pipelines.Tests { - public class ReadAsyncCancellationTests : PipeTest + public class ReadAsyncCancellationTests : StreamPipeTest { [Fact] public async Task AdvanceShouldResetStateIfReadCanceled() diff --git a/src/Http/Http/test/ReadOnlyPipeStreamTests.cs b/src/Http/Http/test/ReadOnlyPipeStreamTests.cs new file mode 100644 index 0000000000..427c30696a --- /dev/null +++ b/src/Http/Http/test/ReadOnlyPipeStreamTests.cs @@ -0,0 +1,164 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; +using Moq; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class ReadOnlyPipeStreamTests : PipeStreamTest + { + [Fact] + public void CanSeekFalse() + { + Assert.False(ReadingStream.CanSeek); + } + + [Fact] + public void CanReadTrue() + { + Assert.True(ReadingStream.CanRead); + } + + [Fact] + public void CanWriteFalse() + { + Assert.False(ReadingStream.CanWrite); + } + + [Fact] + public void LengthThrows() + { + Assert.Throws(() => ReadingStream.Length); + } + + [Fact] + public void PositionThrows() + { + Assert.Throws(() => ReadingStream.Position); + Assert.Throws(() => ReadingStream.Position = 1); + } + + [Fact] + public void SeekThrows() + { + Assert.Throws(() => ReadingStream.Seek(0, SeekOrigin.Begin)); + } + + [Fact] + public void SetLengthThrows() + { + Assert.Throws(() => ReadingStream.SetLength(1)); + } + + [Fact] + public void WriteThrows() + { + Assert.Throws(() => ReadingStream.Write(new byte[1], 0, 1)); + } + + [Fact] + public async Task WriteAsyncThrows() + { + await Assert.ThrowsAsync(async () => await ReadingStream.WriteAsync(new byte[1], 0, 1)); + } + + [Fact] + public void ReadTimeoutThrows() + { + Assert.Throws(() => ReadingStream.WriteTimeout = 1); + Assert.Throws(() => ReadingStream.WriteTimeout); + } + + [Fact] + public async Task ReadAsyncWorks() + { + var expected = "Hello World!"; + + await WriteStringToPipeAsync(expected); + + Assert.Equal(expected, await ReadFromStreamAsStringAsync()); + } + + [Fact] + public async Task BasicLargeRead() + { + var expected = new byte[8000]; + + await WriteByteArrayToPipeAsync(expected); + + Assert.Equal(expected, await ReadFromStreamAsByteArrayAsync(8000)); + } + + [Fact] + public async Task ReadAsyncIsCalledFromCallingRead() + { + var pipeReader = await SetupMockPipeReader(); + var stream = new ReadOnlyPipeStream(pipeReader.Object); + + stream.Read(new byte[1]); + + pipeReader.Verify(m => m.ReadAsync(It.IsAny())); + } + + [Fact] + public async Task ReadAsyncIsCalledFromCallingReadAsync() + { + var pipeReader = await SetupMockPipeReader(); + var stream = new ReadOnlyPipeStream(pipeReader.Object); + + await stream.ReadAsync(new byte[1]); + + pipeReader.Verify(m => m.ReadAsync(It.IsAny())); + } + + [Fact] + public async Task ReadAsyncCancellationTokenIsPassedIntoReadAsync() + { + var pipeReader = await SetupMockPipeReader(); + var stream = new ReadOnlyPipeStream(pipeReader.Object); + var token = new CancellationToken(); + + await stream.ReadAsync(new byte[1], token); + + pipeReader.Verify(m => m.ReadAsync(token)); + } + + [Fact] + public async Task CopyToAsyncWorks() + { + const int expectedSize = 8000; + var expected = new byte[expectedSize]; + + await WriteByteArrayToPipeAsync(expected); + + Writer.Complete(); + var destStream = new MemoryStream(); + + await ReadingStream.CopyToAsync(destStream); + + Assert.Equal(expectedSize, destStream.Length); + } + + [Fact] + public void BlockSyncIOThrows() + { + var readOnlyPipeStream = new ReadOnlyPipeStream(Reader, allowSynchronousIO: false); + Assert.Throws(() => readOnlyPipeStream.Read(new byte[0], 0, 0)); + } + + private async Task> SetupMockPipeReader() + { + await WriteByteArrayToPipeAsync(new byte[1]); + + var pipeReader = new Mock(); + pipeReader + .Setup(m => m.ReadAsync(It.IsAny())) + .Returns(new ValueTask(new ReadResult(new ReadOnlySequence(new byte[1]), false, false))); + return pipeReader; + } + } +} diff --git a/src/Http/Http/test/ReadingAdaptersInteropTests.cs b/src/Http/Http/test/ReadingAdaptersInteropTests.cs new file mode 100644 index 0000000000..fad1909b13 --- /dev/null +++ b/src/Http/Http/test/ReadingAdaptersInteropTests.cs @@ -0,0 +1,147 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class ReadingAdaptersInteropTests + { + [Fact] + public async Task CheckBasicReadPipeApi() + { + var pipe = new Pipe(); + var readStream = new ReadOnlyPipeStream(pipe.Reader); + var pipeReader = new StreamPipeReader(readStream); + + await pipe.Writer.WriteAsync(new byte[10]); + var res = await pipeReader.ReadAsync(); + Assert.Equal(new byte[10], res.Buffer.ToArray()); + } + + [Fact] + public async Task CheckNestedPipeApi() + { + var pipe = new Pipe(); + var reader = pipe.Reader; + for (var i = 0; i < 3; i++) + { + var readStream = new ReadOnlyPipeStream(reader); + reader = new StreamPipeReader(readStream); + } + + await pipe.Writer.WriteAsync(new byte[10]); + var res = await reader.ReadAsync(); + Assert.Equal(new byte[10], res.Buffer.ToArray()); + } + + [Fact] + public async Task CheckBasicReadStreamApi() + { + var stream = new MemoryStream(); + await stream.WriteAsync(new byte[10]); + stream.Position = 0; + + var pipeReader = new StreamPipeReader(stream); + var readOnlyStream = new ReadOnlyPipeStream(pipeReader); + + var resSize = await readOnlyStream.ReadAsync(new byte[10]); + + Assert.Equal(10, resSize); + } + + [Fact] + public async Task CheckNestedStreamApi() + { + var stream = new MemoryStream(); + await stream.WriteAsync(new byte[10]); + stream.Position = 0; + + Stream readOnlyStream = stream; + for (var i = 0; i < 3; i++) + { + var pipeReader = new StreamPipeReader(readOnlyStream); + readOnlyStream = new ReadOnlyPipeStream(pipeReader); + } + + var resSize = await readOnlyStream.ReadAsync(new byte[10]); + + Assert.Equal(10, resSize); + } + + [Fact] + public async Task ReadsCanBeCanceledViaProvidedCancellationToken() + { + var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader()); + var pipeReader = new StreamPipeReader(readOnlyStream); + + var cts = new CancellationTokenSource(1); + await Task.Delay(1); + await Assert.ThrowsAsync(async () => await pipeReader.ReadAsync(cts.Token)); + } + + [Fact] + public async Task ReadCanBeCancelledViaCancelPendingReadWhenReadIsAsync() + { + var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader()); + var pipeReader = new StreamPipeReader(readOnlyStream); + + var result = new ReadResult(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var task = Task.Run(async () => + { + var readingTask = pipeReader.ReadAsync(); + tcs.SetResult(0); + result = await readingTask; + }); + await tcs.Task; + pipeReader.CancelPendingRead(); + await task; + + Assert.True(result.IsCanceled); + } + + private class HangingPipeReader : PipeReader + { + public override void AdvanceTo(SequencePosition consumed) + { + throw new NotImplementedException(); + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + throw new NotImplementedException(); + } + + public override void CancelPendingRead() + { + throw new NotImplementedException(); + } + + public override void Complete(Exception exception = null) + { + throw new NotImplementedException(); + } + + public override void OnWriterCompleted(Action callback, object state) + { + throw new NotImplementedException(); + } + + public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + await Task.Delay(30000, cancellationToken); + return new ReadResult(); + } + + public override bool TryRead(out ReadResult result) + { + result = new ReadResult(); + return false; + } + } + } +} diff --git a/src/Http/Http/test/StreamPipeReaderTests.cs b/src/Http/Http/test/StreamPipeReaderTests.cs index 4c94bb6ce3..b85a602c6d 100644 --- a/src/Http/Http/test/StreamPipeReaderTests.cs +++ b/src/Http/Http/test/StreamPipeReaderTests.cs @@ -11,7 +11,7 @@ using Xunit; namespace System.IO.Pipelines.Tests { - public partial class StreamPipeReaderTests : PipeTest + public partial class StreamPipeReaderTests : StreamPipeTest { [Fact] public async Task CanRead() @@ -198,6 +198,8 @@ namespace System.IO.Pipelines.Tests [Fact] public async Task ReadAsyncReturnsCanceledInterleaved() { + Write(new byte[10000]); + // Cancel and Read interleaved to confirm cancellations are independent for (var i = 0; i < 3; i++) { @@ -501,22 +503,6 @@ namespace System.IO.Pipelines.Tests Assert.False(readResult.Buffer.IsSingleSegment); } - [Fact] - public async Task SetMinimumReadThresholdToMiminumSegmentSizeOnlyGetNewBlockWhenDataIsWritten() - { - CreateReader(minimumReadThreshold: 16); - WriteByteArray(0); - - var readResult = await Reader.ReadAsync(); - Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); - - WriteByteArray(16); - readResult = await Reader.ReadAsync(); - - Assert.Equal(16, readResult.Buffer.Length); - Assert.True(readResult.Buffer.IsSingleSegment); - } - [Fact] public void SetMinimumReadThresholdOfZeroThrows() { @@ -595,6 +581,24 @@ namespace System.IO.Pipelines.Tests Assert.Equal("c", ReadFromStreamAsString(buffer)); } + [Fact] + public async Task ReadAsyncWithNoDataCompletesReader() + { + var readResult = await Reader.ReadAsync(); + + Assert.True(readResult.IsCompleted); + } + + [Fact] + public async Task ReadAsyncWithEmptyDataCompletesStream() + { + WriteByteArray(0); + + var readResult = await Reader.ReadAsync(); + + Assert.True(readResult.IsCompleted); + } + private async Task ReadFromPipeAsString() { var readResult = await Reader.ReadAsync(); diff --git a/src/Http/Http/test/PipeTest.cs b/src/Http/Http/test/StreamPipeTest.cs similarity index 93% rename from src/Http/Http/test/PipeTest.cs rename to src/Http/Http/test/StreamPipeTest.cs index 01801b9518..2d7f406c33 100644 --- a/src/Http/Http/test/PipeTest.cs +++ b/src/Http/Http/test/StreamPipeTest.cs @@ -1,14 +1,11 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using System; -using System.IO; -using System.IO.Pipelines; using System.Text; namespace System.IO.Pipelines.Tests { - public abstract class PipeTest : IDisposable + public abstract class StreamPipeTest : IDisposable { protected const int MaximumSizeHigh = 65; @@ -20,7 +17,7 @@ namespace System.IO.Pipelines.Tests public PipeReader Reader { get; set; } - protected PipeTest() + protected StreamPipeTest() { MemoryStream = new MemoryStream(); Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool()); diff --git a/src/Http/Http/test/StreamPipeWriterTests.cs b/src/Http/Http/test/StreamPipeWriterTests.cs index d51bca9772..4810fedcda 100644 --- a/src/Http/Http/test/StreamPipeWriterTests.cs +++ b/src/Http/Http/test/StreamPipeWriterTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace System.IO.Pipelines.Tests { - public class StreamPipeWriterTests : PipeTest + public class StreamPipeWriterTests : StreamPipeTest { [Fact] public async Task CanWriteAsyncMultipleTimesIntoSameBlock() diff --git a/src/Http/Http/test/WriteOnlyPipeStreamTests.cs b/src/Http/Http/test/WriteOnlyPipeStreamTests.cs new file mode 100644 index 0000000000..c6ee97660c --- /dev/null +++ b/src/Http/Http/test/WriteOnlyPipeStreamTests.cs @@ -0,0 +1,199 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading; +using System.Threading.Tasks; +using Moq; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class WriteOnlyPipeStreamTests : PipeStreamTest + { + [Fact] + public void CanSeekFalse() + { + Assert.False(WritingStream.CanSeek); + } + + [Fact] + public void CanReadFalse() + { + Assert.False(WritingStream.CanRead); + } + + [Fact] + public void CanWriteTrue() + { + Assert.True(WritingStream.CanWrite); + } + + [Fact] + public void LengthThrows() + { + Assert.Throws(() => WritingStream.Length); + } + + [Fact] + public void PositionThrows() + { + Assert.Throws(() => WritingStream.Position); + Assert.Throws(() => WritingStream.Position = 1); + } + + [Fact] + public void SeekThrows() + { + Assert.Throws(() => WritingStream.Seek(0, SeekOrigin.Begin)); + } + + [Fact] + public void SetLengthThrows() + { + Assert.Throws(() => WritingStream.SetLength(1)); + } + + [Fact] + public void ReadThrows() + { + Assert.Throws(() => WritingStream.Read(new byte[1], 0, 1)); + } + + [Fact] + public async Task ReadAsyncThrows() + { + await Assert.ThrowsAsync(async () => await WritingStream.ReadAsync(new byte[1], 0, 1)); + } + + [Fact] + public void ReadTimeoutThrows() + { + Assert.Throws(() => WritingStream.ReadTimeout = 1); + Assert.Throws(() => WritingStream.ReadTimeout); + } + + [Fact] + public async Task WriteAsyncWithReadOnlyMemoryWorks() + { + var expected = "Hello World!"; + + await WriteStringToStreamAsync(expected); + + Assert.Equal(expected, await ReadFromPipeAsStringAsync()); + } + + [Fact] + public async Task WriteAsyncWithArrayWorks() + { + var expected = new byte[1]; + + await WritingStream.WriteAsync(expected, 0, expected.Length); + + Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync()); + } + + [Fact] + public async Task BasicLargeWrite() + { + var expected = new byte[8000]; + + await WritingStream.WriteAsync(expected); + + Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync()); + } + + [Fact] + public void FlushAsyncIsCalledFromCallingFlush() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + + stream.Flush(); + + pipeWriter.Verify(m => m.FlushAsync(default)); + } + + [Fact] + public async Task FlushAsyncIsCalledFromCallingFlushAsync() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + + await stream.FlushAsync(); + + pipeWriter.Verify(m => m.FlushAsync(default)); + } + + [Fact] + public async Task FlushAsyncCancellationTokenIsPassedIntoFlushAsync() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + var token = new CancellationToken(); + + await stream.FlushAsync(token); + + pipeWriter.Verify(m => m.FlushAsync(token)); + } + + [Fact] + public void WriteAsyncIsCalledFromCallingWrite() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + + stream.Write(new byte[1]); + + pipeWriter.Verify(m => m.WriteAsync(It.IsAny>(), It.IsAny())); + } + + [Fact] + public async Task WriteAsyncIsCalledFromCallingWriteAsync() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + + await stream.WriteAsync(new byte[1]); + + pipeWriter.Verify(m => m.WriteAsync(It.IsAny>(), It.IsAny())); + } + + [Fact] + public async Task WriteAsyncCancellationTokenIsPassedIntoWriteAsync() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + var token = new CancellationToken(); + + await stream.WriteAsync(new byte[1], token); + + pipeWriter.Verify(m => m.WriteAsync(It.IsAny>(), token)); + } + + [Fact] + public void WriteAsyncIsCalledFromBeginWrite() + { + var pipeWriter = new Mock(); + var stream = new WriteOnlyPipeStream(pipeWriter.Object); + stream.BeginWrite(new byte[1], 0, 1, null, this); + pipeWriter.Verify(m => m.WriteAsync(It.IsAny>(), It.IsAny())); + } + + [Fact] + public async Task BeginAndEndWriteWork() + { + var expected = new byte[1]; + var asyncResult = WritingStream.BeginWrite(expected, 0, 1, null, this); + WritingStream.EndWrite(asyncResult); + Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync()); + } + + [Fact] + public void BlockSyncIOThrows() + { + var writeOnlyPipeStream = new WriteOnlyPipeStream(Writer, allowSynchronousIO: false); + Assert.Throws(() => writeOnlyPipeStream.Write(new byte[0], 0, 0)); + Assert.Throws(() => writeOnlyPipeStream.Flush()); + } + } +} diff --git a/src/Http/Http/test/WritingAdaptersInteropTests.cs b/src/Http/Http/test/WritingAdaptersInteropTests.cs new file mode 100644 index 0000000000..44280961b2 --- /dev/null +++ b/src/Http/Http/test/WritingAdaptersInteropTests.cs @@ -0,0 +1,155 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class WritingAdaptersInteropTests : PipeStreamTest + { + [Fact] + public async Task CheckBasicWritePipeApi() + { + var pipe = new Pipe(); + var writeOnlyStream = new WriteOnlyPipeStream(pipe.Writer); + var pipeWriter = new StreamPipeWriter(writeOnlyStream); + await pipeWriter.WriteAsync(new byte[10]); + + var res = await pipe.Reader.ReadAsync(); + Assert.Equal(new byte[10], res.Buffer.ToArray()); + } + + [Fact] + public async Task CheckNestedPipeApi() + { + var pipe = new Pipe(); + var writer = pipe.Writer; + for (var i = 0; i < 3; i++) + { + var writeOnlyStream = new WriteOnlyPipeStream(writer); + writer = new StreamPipeWriter(writeOnlyStream); + } + + await writer.WriteAsync(new byte[10]); + + var res = await pipe.Reader.ReadAsync(); + Assert.Equal(new byte[10], res.Buffer.ToArray()); + } + + [Fact] + public async Task CheckBasicWriteStreamApi() + { + var stream = new MemoryStream(); + var pipeWriter = new StreamPipeWriter(stream); + var writeOnlyStream = new WriteOnlyPipeStream(pipeWriter); + + await writeOnlyStream.WriteAsync(new byte[10]); + + stream.Position = 0; + var res = await ReadFromStreamAsByteArrayAsync(10, stream); + Assert.Equal(new byte[10], res); + } + + [Fact] + public async Task CheckNestedStreamApi() + { + var stream = new MemoryStream(); + Stream writeOnlyStream = stream; + for (var i = 0; i < 3; i++) + { + var pipeWriter = new StreamPipeWriter(writeOnlyStream); + writeOnlyStream = new WriteOnlyPipeStream(pipeWriter); + } + + await writeOnlyStream.WriteAsync(new byte[10]); + + stream.Position = 0; + var res = await ReadFromStreamAsByteArrayAsync(10, stream); + Assert.Equal(new byte[10], res); + } + + [Fact] + public async Task WritesCanBeCanceledViaProvidedCancellationToken() + { + var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter()); + var pipeWriter = new StreamPipeWriter(writeOnlyStream); + var cts = new CancellationTokenSource(1); + await Assert.ThrowsAsync(async () => await pipeWriter.WriteAsync(new byte[1], cts.Token)); + } + + [Fact] + public async Task WriteCanBeCanceledViaCancelPendingFlushWhenFlushIsAsync() + { + var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter()); + var pipeWriter = new StreamPipeWriter(writeOnlyStream); + + FlushResult flushResult = new FlushResult(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var task = Task.Run(async () => + { + try + { + var writingTask = pipeWriter.WriteAsync(new byte[1]); + tcs.SetResult(0); + flushResult = await writingTask; + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + throw ex; + } + }); + + await tcs.Task; + + pipeWriter.CancelPendingFlush(); + + await task; + + Assert.True(flushResult.IsCanceled); + } + + private class HangingPipeWriter : PipeWriter + { + public override void Advance(int bytes) + { + } + + public override void CancelPendingFlush() + { + throw new NotImplementedException(); + } + + public override void Complete(Exception exception = null) + { + throw new NotImplementedException(); + } + + public override async ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + await Task.Delay(30000, cancellationToken); + return new FlushResult(); + } + + public override Memory GetMemory(int sizeHint = 0) + { + return new Memory(new byte[4096]); + } + + public override Span GetSpan(int sizeHint = 0) + { + return new Span(new byte[4096]); + } + + public override void OnReaderCompleted(Action callback, object state) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs b/src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs index a160d9180e..31d73b2481 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs @@ -68,7 +68,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { - var task = ReadAsync(buffer, offset, count, default(CancellationToken), state); + var task = ReadAsync(buffer, offset, count, default, state); if (callback != null) { task.ContinueWith(t => callback.Invoke(t)); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/HttpResponseStream.cs b/src/Servers/Kestrel/Core/src/Internal/Http/HttpResponseStream.cs index 2d50902e31..4ec97117cb 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/HttpResponseStream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/HttpResponseStream.cs @@ -37,7 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override void Flush() { - FlushAsync(default(CancellationToken)).GetAwaiter().GetResult(); + FlushAsync(default).GetAwaiter().GetResult(); } public override Task FlushAsync(CancellationToken cancellationToken) @@ -64,12 +64,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http throw new InvalidOperationException(CoreStrings.SynchronousWritesDisallowed); } - WriteAsync(buffer, offset, count, default(CancellationToken)).GetAwaiter().GetResult(); + WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult(); } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { - var task = WriteAsync(buffer, offset, count, default(CancellationToken), state); + var task = WriteAsync(buffer, offset, count, default, state); if (callback != null) { task.ContinueWith(t => callback.Invoke(t)); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs b/src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs index 30248cac50..5dfe295497 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs @@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http try { - if (!readableBuffer.IsEmpty) + if (readableBufferLength != 0) { // buffer.Length is int actual = (int)Math.Min(readableBufferLength, buffer.Length); @@ -69,8 +69,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // Make sure we don't double-count bytes on the next read. _alreadyTimedBytes = readableBufferLength - actual; - var slice = readableBuffer.Slice(0, actual); - consumed = readableBuffer.GetPosition(actual); + var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual); + consumed = slice.End; slice.CopyTo(buffer.Span); return actual; @@ -106,7 +106,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http try { - if (!readableBuffer.IsEmpty) + if (readableBufferLength != 0) { foreach (var memory in readableBuffer) {