Fix returning buffered data after stream is drained (#7476)
- Change TryRead to return the buffer if the Stream is completed
This commit is contained in:
parent
05923e9151
commit
c1bc210e8e
|
|
@ -21,7 +21,7 @@ namespace System.IO.Pipelines
|
|||
|
||||
private CancellationTokenSource _internalTokenSource;
|
||||
private bool _isReaderCompleted;
|
||||
private bool _isWriterCompleted;
|
||||
private bool _isStreamCompleted;
|
||||
private ExceptionDispatchInfo _exceptionInfo;
|
||||
|
||||
private BufferSegment _readHead;
|
||||
|
|
@ -206,11 +206,6 @@ namespace System.IO.Pipelines
|
|||
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
// TODO ReadyAsync needs to throw if there are overlapping reads.
|
||||
if (_isWriterCompleted)
|
||||
{
|
||||
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
|
||||
}
|
||||
|
||||
ThrowIfCompleted();
|
||||
|
||||
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
|
||||
|
|
@ -220,6 +215,11 @@ namespace System.IO.Pipelines
|
|||
return readResult;
|
||||
}
|
||||
|
||||
if (_isStreamCompleted)
|
||||
{
|
||||
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
|
||||
}
|
||||
|
||||
var reg = new CancellationTokenRegistration();
|
||||
if (cancellationToken.CanBeCanceled)
|
||||
{
|
||||
|
|
@ -251,7 +251,7 @@ namespace System.IO.Pipelines
|
|||
|
||||
if (length == 0)
|
||||
{
|
||||
_isWriterCompleted = true;
|
||||
_isStreamCompleted = true;
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
|
|
@ -296,7 +296,7 @@ namespace System.IO.Pipelines
|
|||
private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
|
||||
{
|
||||
var isCancellationRequested = source.IsCancellationRequested;
|
||||
if (isCancellationRequested || _bufferedBytes > 0 && !_examinedEverything)
|
||||
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
|
||||
{
|
||||
// If TryRead/ReadAsync are called and cancellation is requested, we need to make sure memory is allocated for the ReadResult,
|
||||
// otherwise if someone calls advance afterward on the ReadResult, it will throw.
|
||||
|
|
@ -368,7 +368,7 @@ namespace System.IO.Pipelines
|
|||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private bool IsCompletedOrThrow()
|
||||
{
|
||||
if (!_isWriterCompleted)
|
||||
if (!_isStreamCompleted)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -642,9 +642,30 @@ namespace System.IO.Pipelines.Tests
|
|||
Assert.Equal(Stream, ((StreamPipeReader)Reader).InnerStream);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
|
||||
{
|
||||
var helloBytes = Encoding.ASCII.GetBytes("Hello World");
|
||||
Write(helloBytes);
|
||||
|
||||
var readResult = await Reader.ReadAsync();
|
||||
var buffer = readResult.Buffer;
|
||||
Reader.AdvanceTo(buffer.Start, buffer.End);
|
||||
|
||||
// Make sure IsCompleted is true
|
||||
readResult = await Reader.ReadAsync();
|
||||
buffer = readResult.Buffer;
|
||||
Reader.AdvanceTo(buffer.Start, buffer.End);
|
||||
Assert.True(readResult.IsCompleted);
|
||||
|
||||
var value = await ReadFromPipeAsString();
|
||||
Assert.Equal("Hello World", value);
|
||||
}
|
||||
|
||||
private async Task<string> ReadFromPipeAsString()
|
||||
{
|
||||
var readResult = await Reader.ReadAsync();
|
||||
|
||||
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
|
||||
Reader.AdvanceTo(readResult.Buffer.End);
|
||||
return result;
|
||||
|
|
|
|||
Loading…
Reference in New Issue