From c1bc210e8ebb6402ac74f4705d5748bc8e3ee544 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Mon, 11 Feb 2019 20:45:15 -0800 Subject: [PATCH] Fix returning buffered data after stream is drained (#7476) - Change TryRead to return the buffer if the Stream is completed --- src/Http/Http/src/StreamPipeReader.cs | 18 +++++++++--------- src/Http/Http/test/StreamPipeReaderTests.cs | 21 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/Http/Http/src/StreamPipeReader.cs b/src/Http/Http/src/StreamPipeReader.cs index db19ac8313..cbc360ddea 100644 --- a/src/Http/Http/src/StreamPipeReader.cs +++ b/src/Http/Http/src/StreamPipeReader.cs @@ -21,7 +21,7 @@ namespace System.IO.Pipelines private CancellationTokenSource _internalTokenSource; private bool _isReaderCompleted; - private bool _isWriterCompleted; + private bool _isStreamCompleted; private ExceptionDispatchInfo _exceptionInfo; private BufferSegment _readHead; @@ -206,11 +206,6 @@ namespace System.IO.Pipelines public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) { // TODO ReadyAsync needs to throw if there are overlapping reads. - if (_isWriterCompleted) - { - return new ReadResult(buffer: default, isCanceled: false, isCompleted: true); - } - ThrowIfCompleted(); // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock) @@ -220,6 +215,11 @@ namespace System.IO.Pipelines return readResult; } + if (_isStreamCompleted) + { + return new ReadResult(buffer: default, isCanceled: false, isCompleted: true); + } + var reg = new CancellationTokenRegistration(); if (cancellationToken.CanBeCanceled) { @@ -251,7 +251,7 @@ namespace System.IO.Pipelines if (length == 0) { - _isWriterCompleted = true; + _isStreamCompleted = true; } } catch (OperationCanceledException) @@ -296,7 +296,7 @@ namespace System.IO.Pipelines private bool TryReadInternal(CancellationTokenSource source, out ReadResult result) { var isCancellationRequested = source.IsCancellationRequested; - if (isCancellationRequested || _bufferedBytes > 0 && !_examinedEverything) + if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted)) { // If TryRead/ReadAsync are called and cancellation is requested, we need to make sure memory is allocated for the ReadResult, // otherwise if someone calls advance afterward on the ReadResult, it will throw. @@ -368,7 +368,7 @@ namespace System.IO.Pipelines [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool IsCompletedOrThrow() { - if (!_isWriterCompleted) + if (!_isStreamCompleted) { return false; } diff --git a/src/Http/Http/test/StreamPipeReaderTests.cs b/src/Http/Http/test/StreamPipeReaderTests.cs index dc68506a05..97859f5940 100644 --- a/src/Http/Http/test/StreamPipeReaderTests.cs +++ b/src/Http/Http/test/StreamPipeReaderTests.cs @@ -642,9 +642,30 @@ namespace System.IO.Pipelines.Tests Assert.Equal(Stream, ((StreamPipeReader)Reader).InnerStream); } + [Fact] + public async Task BufferingDataPastEndOfStreamCanBeReadAgain() + { + var helloBytes = Encoding.ASCII.GetBytes("Hello World"); + Write(helloBytes); + + var readResult = await Reader.ReadAsync(); + var buffer = readResult.Buffer; + Reader.AdvanceTo(buffer.Start, buffer.End); + + // Make sure IsCompleted is true + readResult = await Reader.ReadAsync(); + buffer = readResult.Buffer; + Reader.AdvanceTo(buffer.Start, buffer.End); + Assert.True(readResult.IsCompleted); + + var value = await ReadFromPipeAsString(); + Assert.Equal("Hello World", value); + } + private async Task ReadFromPipeAsString() { var readResult = await Reader.ReadAsync(); + var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray()); Reader.AdvanceTo(readResult.Buffer.End); return result;