From 3bd5f2c2ab82d6d870b7fcea484761294f955751 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Fri, 8 Feb 2019 08:44:09 -0800 Subject: [PATCH] Return Completed ReadResult after stream returns 0 bytes (#7337) --- src/Http/Http/src/StreamPipeReader.cs | 5 ++ src/Http/Http/test/StreamPipeReaderTests.cs | 53 +++++++++++++++++---- src/Http/Http/test/StreamPipeTest.cs | 26 +++++----- src/Http/Http/test/StreamPipeWriterTests.cs | 16 +++---- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/src/Http/Http/src/StreamPipeReader.cs b/src/Http/Http/src/StreamPipeReader.cs index d4966818a2..5ddcdb2cc3 100644 --- a/src/Http/Http/src/StreamPipeReader.cs +++ b/src/Http/Http/src/StreamPipeReader.cs @@ -208,6 +208,11 @@ namespace System.IO.Pipelines public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) { // TODO ReadyAsync needs to throw if there are overlapping reads. + if (_isWriterCompleted) + { + return new ReadResult(buffer: default, isCanceled: false, isCompleted: true); + } + ThrowIfCompleted(); // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock) diff --git a/src/Http/Http/test/StreamPipeReaderTests.cs b/src/Http/Http/test/StreamPipeReaderTests.cs index b85a602c6d..8d777110f1 100644 --- a/src/Http/Http/test/StreamPipeReaderTests.cs +++ b/src/Http/Http/test/StreamPipeReaderTests.cs @@ -431,7 +431,7 @@ namespace System.IO.Pipelines.Tests [Fact] public async Task AsyncReadWorks() { - MemoryStream = new AsyncStream(); + Stream = new AsyncStream(); CreateReader(); WriteByteArray(2000); @@ -455,7 +455,7 @@ namespace System.IO.Pipelines.Tests Write(Encoding.ASCII.GetBytes(new string('a', 8))); var readResult = await Reader.ReadAsync(); Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End); - MemoryStream.Position = 0; + Stream.Position = 0; readResult = await Reader.ReadAsync(); var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray()); @@ -471,11 +471,11 @@ namespace System.IO.Pipelines.Tests Write(Encoding.ASCII.GetBytes(new string('a', 8))); var readResult = await Reader.ReadAsync(); Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End); - MemoryStream.Position = 0; + Stream.Position = 0; readResult = await Reader.ReadAsync(); Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); - MemoryStream.Position = 0; + Stream.Position = 0; readResult = await Reader.ReadAsync(); var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray()); @@ -506,14 +506,14 @@ namespace System.IO.Pipelines.Tests [Fact] public void SetMinimumReadThresholdOfZeroThrows() { - Assert.Throws(() => new StreamPipeReader(MemoryStream, + Assert.Throws(() => new StreamPipeReader(Stream, new StreamPipeReaderOptions(minimumSegmentSize: 4096, minimumReadThreshold: 0, new TestMemoryPool()))); } [Fact] public void SetOptionsToNullThrows() { - Assert.Throws(() => new StreamPipeReader(MemoryStream, null)); + Assert.Throws(() => new StreamPipeReader(Stream, null)); } [Fact] @@ -522,7 +522,7 @@ namespace System.IO.Pipelines.Tests Write(new byte[8]); var buffer = new byte[4]; - MemoryStream.Read(buffer, 0, buffer.Length); + Stream.Read(buffer, 0, buffer.Length); var readResult = await Reader.ReadAsync(); Assert.Equal(buffer, readResult.Buffer.ToArray()); @@ -599,6 +599,18 @@ namespace System.IO.Pipelines.Tests Assert.True(readResult.IsCompleted); } + [Fact] + public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow() + { + Stream = new ThrowAfterZeroByteReadStream(); + Reader = new StreamPipeReader(Stream); + var readResult = await Reader.ReadAsync(); + + readResult = await Reader.ReadAsync(); + Assert.True(readResult.Buffer.IsEmpty); + Assert.True(readResult.IsCompleted); + } + private async Task ReadFromPipeAsString() { var readResult = await Reader.ReadAsync(); @@ -609,7 +621,7 @@ namespace System.IO.Pipelines.Tests private string ReadFromStreamAsString(byte[] buffer) { - var res = MemoryStream.Read(buffer, 0, buffer.Length); + var res = Stream.Read(buffer, 0, buffer.Length); return Encoding.ASCII.GetString(buffer); } @@ -621,7 +633,7 @@ namespace System.IO.Pipelines.Tests private void CreateReader(int minimumSegmentSize = 16, int minimumReadThreshold = 4, MemoryPool memoryPool = null) { - Reader = new StreamPipeReader(MemoryStream, + Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions( minimumSegmentSize, minimumReadThreshold, @@ -661,5 +673,28 @@ namespace System.IO.Pipelines.Tests } #endif } + + private class ThrowAfterZeroByteReadStream : MemoryStream + { + private bool _throwOnNextCallToRead; + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(new Memory(buffer, offset, count)).AsTask(); + } + + public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + if (_throwOnNextCallToRead) + { + throw new Exception(); + } + var bytes = await base.ReadAsync(destination, cancellationToken); + if (bytes == 0) + { + _throwOnNextCallToRead = true; + } + return bytes; + } + } } } diff --git a/src/Http/Http/test/StreamPipeTest.cs b/src/Http/Http/test/StreamPipeTest.cs index 2d7f406c33..86b554c403 100644 --- a/src/Http/Http/test/StreamPipeTest.cs +++ b/src/Http/Http/test/StreamPipeTest.cs @@ -11,7 +11,7 @@ namespace System.IO.Pipelines.Tests protected const int MinimumSegmentSize = 4096; - public MemoryStream MemoryStream { get; set; } + public Stream Stream { get; set; } public PipeWriter Writer { get; set; } @@ -19,9 +19,9 @@ namespace System.IO.Pipelines.Tests protected StreamPipeTest() { - MemoryStream = new MemoryStream(); - Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool()); - Reader = new StreamPipeReader(MemoryStream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool())); + Stream = new MemoryStream(); + Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool()); + Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool())); } public void Dispose() @@ -44,27 +44,27 @@ namespace System.IO.Pipelines.Tests public void Write(byte[] data) { - MemoryStream.Write(data, 0, data.Length); - MemoryStream.Position = 0; + Stream.Write(data, 0, data.Length); + Stream.Position = 0; } public void WriteWithoutPosition(byte[] data) { - MemoryStream.Write(data, 0, data.Length); + Stream.Write(data, 0, data.Length); } public void Append(byte[] data) { - var originalPosition = MemoryStream.Position; - MemoryStream.Write(data, 0, data.Length); - MemoryStream.Position = originalPosition; + var originalPosition = Stream.Position; + Stream.Write(data, 0, data.Length); + Stream.Position = originalPosition; } public byte[] ReadWithoutFlush() { - MemoryStream.Position = 0; - var buffer = new byte[MemoryStream.Length]; - var result = MemoryStream.Read(buffer, 0, (int)MemoryStream.Length); + Stream.Position = 0; + var buffer = new byte[Stream.Length]; + var result = Stream.Read(buffer, 0, (int)Stream.Length); return buffer; } } diff --git a/src/Http/Http/test/StreamPipeWriterTests.cs b/src/Http/Http/test/StreamPipeWriterTests.cs index e2bfb584b6..aaba3a6101 100644 --- a/src/Http/Http/test/StreamPipeWriterTests.cs +++ b/src/Http/Http/test/StreamPipeWriterTests.cs @@ -32,7 +32,7 @@ namespace System.IO.Pipelines.Tests [InlineData(8000, 8000)] public async Task CanAdvanceWithPartialConsumptionOfFirstSegment(int firstWriteLength, int secondWriteLength) { - Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool(maxBufferSize: 20000)); + Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool(maxBufferSize: 20000)); await Writer.WriteAsync(Encoding.ASCII.GetBytes("a")); var expectedLength = firstWriteLength + secondWriteLength + 1; @@ -136,8 +136,8 @@ namespace System.IO.Pipelines.Tests [Fact(Skip = "https://github.com/aspnet/AspNetCore/issues/4621")] public async Task CancelPendingFlushBetweenWritesAllDataIsPreserved() { - MemoryStream = new SingleWriteStream(); - Writer = new StreamPipeWriter(MemoryStream); + Stream = new SingleWriteStream(); + Writer = new StreamPipeWriter(Stream); FlushResult flushResult = new FlushResult(); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -174,8 +174,8 @@ namespace System.IO.Pipelines.Tests [Fact] public async Task CancelPendingFlushAfterAllWritesAllDataIsPreserved() { - MemoryStream = new CannotFlushStream(); - Writer = new StreamPipeWriter(MemoryStream); + Stream = new CannotFlushStream(); + Writer = new StreamPipeWriter(Stream); FlushResult flushResult = new FlushResult(); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -211,8 +211,8 @@ namespace System.IO.Pipelines.Tests { var writeSize = 16; var singleWriteStream = new SingleWriteStream(); - MemoryStream = singleWriteStream; - Writer = new StreamPipeWriter(MemoryStream, minimumSegmentSize: writeSize); + Stream = singleWriteStream; + Writer = new StreamPipeWriter(Stream, minimumSegmentSize: writeSize); for (var i = 0; i < 10; i++) { @@ -392,7 +392,7 @@ namespace System.IO.Pipelines.Tests private void WriteStringToStream(string input) { var buffer = Encoding.ASCII.GetBytes(input); - MemoryStream.Write(buffer, 0, buffer.Length); + Stream.Write(buffer, 0, buffer.Length); } private async Task WriteStringToPipeWriter(string input)