Return Completed ReadResult after stream returns 0 bytes (#7337)

This commit is contained in:
Justin Kotalik 2019-02-08 08:44:09 -08:00 committed by GitHub
parent b2f850aa82
commit 3bd5f2c2ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 30 deletions

View File

@ -208,6 +208,11 @@ namespace System.IO.Pipelines
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
if (_isWriterCompleted)
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
}
ThrowIfCompleted();
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)

View File

@ -431,7 +431,7 @@ namespace System.IO.Pipelines.Tests
[Fact]
public async Task AsyncReadWorks()
{
MemoryStream = new AsyncStream();
Stream = new AsyncStream();
CreateReader();
WriteByteArray(2000);
@ -455,7 +455,7 @@ namespace System.IO.Pipelines.Tests
Write(Encoding.ASCII.GetBytes(new string('a', 8)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End);
MemoryStream.Position = 0;
Stream.Position = 0;
readResult = await Reader.ReadAsync();
var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
@ -471,11 +471,11 @@ namespace System.IO.Pipelines.Tests
Write(Encoding.ASCII.GetBytes(new string('a', 8)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End);
MemoryStream.Position = 0;
Stream.Position = 0;
readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
MemoryStream.Position = 0;
Stream.Position = 0;
readResult = await Reader.ReadAsync();
var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
@ -506,14 +506,14 @@ namespace System.IO.Pipelines.Tests
[Fact]
public void SetMinimumReadThresholdOfZeroThrows()
{
Assert.Throws<ArgumentOutOfRangeException>(() => new StreamPipeReader(MemoryStream,
Assert.Throws<ArgumentOutOfRangeException>(() => new StreamPipeReader(Stream,
new StreamPipeReaderOptions(minimumSegmentSize: 4096, minimumReadThreshold: 0, new TestMemoryPool())));
}
[Fact]
public void SetOptionsToNullThrows()
{
Assert.Throws<ArgumentNullException>(() => new StreamPipeReader(MemoryStream, null));
Assert.Throws<ArgumentNullException>(() => new StreamPipeReader(Stream, null));
}
[Fact]
@ -522,7 +522,7 @@ namespace System.IO.Pipelines.Tests
Write(new byte[8]);
var buffer = new byte[4];
MemoryStream.Read(buffer, 0, buffer.Length);
Stream.Read(buffer, 0, buffer.Length);
var readResult = await Reader.ReadAsync();
Assert.Equal(buffer, readResult.Buffer.ToArray());
@ -599,6 +599,18 @@ namespace System.IO.Pipelines.Tests
Assert.True(readResult.IsCompleted);
}
[Fact]
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
{
Stream = new ThrowAfterZeroByteReadStream();
Reader = new StreamPipeReader(Stream);
var readResult = await Reader.ReadAsync();
readResult = await Reader.ReadAsync();
Assert.True(readResult.Buffer.IsEmpty);
Assert.True(readResult.IsCompleted);
}
private async Task<string> ReadFromPipeAsString()
{
var readResult = await Reader.ReadAsync();
@ -609,7 +621,7 @@ namespace System.IO.Pipelines.Tests
private string ReadFromStreamAsString(byte[] buffer)
{
var res = MemoryStream.Read(buffer, 0, buffer.Length);
var res = Stream.Read(buffer, 0, buffer.Length);
return Encoding.ASCII.GetString(buffer);
}
@ -621,7 +633,7 @@ namespace System.IO.Pipelines.Tests
private void CreateReader(int minimumSegmentSize = 16, int minimumReadThreshold = 4, MemoryPool<byte> memoryPool = null)
{
Reader = new StreamPipeReader(MemoryStream,
Reader = new StreamPipeReader(Stream,
new StreamPipeReaderOptions(
minimumSegmentSize,
minimumReadThreshold,
@ -661,5 +673,28 @@ namespace System.IO.Pipelines.Tests
}
#endif
}
private class ThrowAfterZeroByteReadStream : MemoryStream
{
private bool _throwOnNextCallToRead;
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsync(new Memory<byte>(buffer, offset, count)).AsTask();
}
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
if (_throwOnNextCallToRead)
{
throw new Exception();
}
var bytes = await base.ReadAsync(destination, cancellationToken);
if (bytes == 0)
{
_throwOnNextCallToRead = true;
}
return bytes;
}
}
}
}

View File

@ -11,7 +11,7 @@ namespace System.IO.Pipelines.Tests
protected const int MinimumSegmentSize = 4096;
public MemoryStream MemoryStream { get; set; }
public Stream Stream { get; set; }
public PipeWriter Writer { get; set; }
@ -19,9 +19,9 @@ namespace System.IO.Pipelines.Tests
protected StreamPipeTest()
{
MemoryStream = new MemoryStream();
Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool());
Reader = new StreamPipeReader(MemoryStream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool()));
Stream = new MemoryStream();
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool());
Reader = new StreamPipeReader(Stream, new StreamPipeReaderOptions(MinimumSegmentSize, minimumReadThreshold: 256, new TestMemoryPool()));
}
public void Dispose()
@ -44,27 +44,27 @@ namespace System.IO.Pipelines.Tests
public void Write(byte[] data)
{
MemoryStream.Write(data, 0, data.Length);
MemoryStream.Position = 0;
Stream.Write(data, 0, data.Length);
Stream.Position = 0;
}
public void WriteWithoutPosition(byte[] data)
{
MemoryStream.Write(data, 0, data.Length);
Stream.Write(data, 0, data.Length);
}
public void Append(byte[] data)
{
var originalPosition = MemoryStream.Position;
MemoryStream.Write(data, 0, data.Length);
MemoryStream.Position = originalPosition;
var originalPosition = Stream.Position;
Stream.Write(data, 0, data.Length);
Stream.Position = originalPosition;
}
public byte[] ReadWithoutFlush()
{
MemoryStream.Position = 0;
var buffer = new byte[MemoryStream.Length];
var result = MemoryStream.Read(buffer, 0, (int)MemoryStream.Length);
Stream.Position = 0;
var buffer = new byte[Stream.Length];
var result = Stream.Read(buffer, 0, (int)Stream.Length);
return buffer;
}
}

View File

@ -32,7 +32,7 @@ namespace System.IO.Pipelines.Tests
[InlineData(8000, 8000)]
public async Task CanAdvanceWithPartialConsumptionOfFirstSegment(int firstWriteLength, int secondWriteLength)
{
Writer = new StreamPipeWriter(MemoryStream, MinimumSegmentSize, new TestMemoryPool(maxBufferSize: 20000));
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool(maxBufferSize: 20000));
await Writer.WriteAsync(Encoding.ASCII.GetBytes("a"));
var expectedLength = firstWriteLength + secondWriteLength + 1;
@ -136,8 +136,8 @@ namespace System.IO.Pipelines.Tests
[Fact(Skip = "https://github.com/aspnet/AspNetCore/issues/4621")]
public async Task CancelPendingFlushBetweenWritesAllDataIsPreserved()
{
MemoryStream = new SingleWriteStream();
Writer = new StreamPipeWriter(MemoryStream);
Stream = new SingleWriteStream();
Writer = new StreamPipeWriter(Stream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -174,8 +174,8 @@ namespace System.IO.Pipelines.Tests
[Fact]
public async Task CancelPendingFlushAfterAllWritesAllDataIsPreserved()
{
MemoryStream = new CannotFlushStream();
Writer = new StreamPipeWriter(MemoryStream);
Stream = new CannotFlushStream();
Writer = new StreamPipeWriter(Stream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -211,8 +211,8 @@ namespace System.IO.Pipelines.Tests
{
var writeSize = 16;
var singleWriteStream = new SingleWriteStream();
MemoryStream = singleWriteStream;
Writer = new StreamPipeWriter(MemoryStream, minimumSegmentSize: writeSize);
Stream = singleWriteStream;
Writer = new StreamPipeWriter(Stream, minimumSegmentSize: writeSize);
for (var i = 0; i < 10; i++)
{
@ -392,7 +392,7 @@ namespace System.IO.Pipelines.Tests
private void WriteStringToStream(string input)
{
var buffer = Encoding.ASCII.GetBytes(input);
MemoryStream.Write(buffer, 0, buffer.Length);
Stream.Write(buffer, 0, buffer.Length);
}
private async Task WriteStringToPipeWriter(string input)