Make StreamPipeWriter.Complete throw if there is unflushed data (#7115)

This commit is contained in:
Justin Kotalik 2019-01-30 11:22:53 -08:00 committed by GitHub
parent 7ea4a5f778
commit 8f51dd35ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 52 deletions

View File

@ -1,14 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@ -31,7 +25,6 @@ namespace System.IO.Pipelines
private CancellationTokenSource _internalTokenSource;
private bool _isCompleted;
private ExceptionDispatchInfo _exceptionInfo;
private object _lockObject = new object();
private CancellationTokenSource InternalTokenSource
@ -91,6 +84,15 @@ namespace System.IO.Pipelines
/// <inheritdoc />
public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(sizeHint));
}
EnsureCapacity(sizeHint);
return _currentSegment;
@ -99,6 +101,15 @@ namespace System.IO.Pipelines
/// <inheritdoc />
public override Span<byte> GetSpan(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(sizeHint));
}
EnsureCapacity(sizeHint);
return _currentSegment.Span.Slice(_position);
@ -119,10 +130,6 @@ namespace System.IO.Pipelines
}
_isCompleted = true;
if (exception != null)
{
_exceptionInfo = ExceptionDispatchInfo.Capture(exception);
}
_internalTokenSource?.Dispose();
@ -135,6 +142,12 @@ namespace System.IO.Pipelines
}
_currentSegmentOwner?.Dispose();
// We still want to cleanup segments before throwing an exception.
if (_bytesWritten > 0 && exception == null)
{
ThrowHelper.ThrowInvalidOperationException_DataNotAllFlushed();
}
}
/// <inheritdoc />
@ -148,7 +161,7 @@ namespace System.IO.Pipelines
{
if (_bytesWritten == 0)
{
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, IsCompletedOrThrow()));
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, _isCompleted));
}
return FlushAsyncInternal(cancellationToken);
@ -209,7 +222,7 @@ namespace System.IO.Pipelines
await _writingStream.FlushAsync(localToken);
return new FlushResult(isCanceled: false, IsCompletedOrThrow());
return new FlushResult(isCanceled: false, _isCompleted);
}
catch (OperationCanceledException)
{
@ -226,7 +239,7 @@ namespace System.IO.Pipelines
}
// Catch any cancellation and translate it into setting isCanceled = true
return new FlushResult(isCanceled: true, IsCompletedOrThrow());
return new FlushResult(isCanceled: true, _isCompleted);
}
}
}
@ -270,28 +283,6 @@ namespace System.IO.Pipelines
_position = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsCompletedOrThrow()
{
if (!_isCompleted)
{
return false;
}
if (_exceptionInfo != null)
{
ThrowLatchedException();
}
return true;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowLatchedException()
{
_exceptionInfo.Throw();
}
public void Dispose()
{
Complete();

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Runtime.CompilerServices;
namespace System.IO.Pipelines
@ -31,5 +30,17 @@ namespace System.IO.Pipelines
public static void ThrowInvalidOperationException_SynchronousFlushesDisallowed() => throw CreateInvalidOperationException_SynchronousFlushesDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousFlushesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call FlushAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_DataNotAllFlushed() => throw CreateInvalidOperationException_DataNotAllFlushed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_DataNotAllFlushed() => new InvalidOperationException("Complete called without flushing the StreamPipeWriter. Call FlushAsync() before calling Complete().");
public static void ThrowInvalidOperationException_NoWritingAllowed() => throw CreateInvalidOperationException_NoWritingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException("Writing is not allowed after writer was completed.");
public static void ThrowArgumentOutOfRangeException(string argument) => throw CreateArgumentOutOfRangeException(argument);
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateArgumentOutOfRangeException(string argument) => new ArgumentOutOfRangeException(argument);
}
}

View File

@ -47,15 +47,6 @@ namespace System.IO.Pipelines.Tests
Assert.Equal(expectedLength, Read().Length);
}
[Fact]
public async Task ThrowsOnCompleteAndWrite()
{
Writer.Complete(new InvalidOperationException("Whoops"));
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await Writer.FlushAsync());
Assert.Equal("Whoops", exception.Message);
}
[Fact]
public async Task WriteCanBeCancelledViaProvidedCancellationToken()
{
@ -117,17 +108,17 @@ namespace System.IO.Pipelines.Tests
}
[Fact]
public void FlushAsyncReturnsCanceledIfCanceledBeforeFlush()
public async Task FlushAsyncReturnsCanceledIfCanceledBeforeFlush()
{
CheckCanceledFlush();
await CheckCanceledFlush();
}
[Fact]
public void FlushAsyncReturnsCanceledIfCanceledBeforeFlushMultipleTimes()
public async Task FlushAsyncReturnsCanceledIfCanceledBeforeFlushMultipleTimes()
{
for (var i = 0; i < 10; i++)
{
CheckCanceledFlush();
await CheckCanceledFlush();
}
}
@ -136,7 +127,7 @@ namespace System.IO.Pipelines.Tests
{
for (var i = 0; i < 5; i++)
{
CheckCanceledFlush();
await CheckCanceledFlush();
await CheckWriteIsNotCanceled();
}
}
@ -316,6 +307,49 @@ namespace System.IO.Pipelines.Tests
Assert.Equal(expectedString, ReadAsString());
}
[Fact]
public void CallCompleteWithoutFlush_ThrowsInvalidOperationException()
{
var memory = Writer.GetMemory();
Writer.Advance(memory.Length);
var ex = Assert.Throws<InvalidOperationException>(() => Writer.Complete());
Assert.Equal(ThrowHelper.CreateInvalidOperationException_DataNotAllFlushed().Message, ex.Message);
}
[Fact]
public void CallCompleteWithoutFlushAndException_DoesNotThrowInvalidOperationException()
{
var memory = Writer.GetMemory();
Writer.Advance(memory.Length);
Writer.Complete(new Exception());
}
[Fact]
public void CallComplete_GetMemoryThrows()
{
Writer.Complete();
Assert.Throws<InvalidOperationException>(() => Writer.GetMemory());
}
[Fact]
public void CallComplete_GetSpanThrows()
{
Writer.Complete();
Assert.Throws<InvalidOperationException>(() => Writer.GetSpan());
}
[Fact]
public void CallGetMemoryWithNegativeSizeHint_ThrowsArgException()
{
Assert.Throws<ArgumentOutOfRangeException>(() => Writer.GetMemory(-1));
}
[Fact]
public void CallGetSpanWithNegativeSizeHint_ThrowsArgException()
{
Assert.Throws<ArgumentOutOfRangeException>(() => Writer.GetSpan(-1));
}
private void WriteStringToStream(string input)
{
var buffer = Encoding.ASCII.GetBytes(input);
@ -333,7 +367,7 @@ namespace System.IO.Pipelines.Tests
Assert.False(flushResult.IsCanceled);
}
private void CheckCanceledFlush()
private async Task CheckCanceledFlush()
{
PipeWriter writableBuffer = Writer.WriteEmpty(MaximumSizeHigh);
@ -344,6 +378,7 @@ namespace System.IO.Pipelines.Tests
Assert.True(flushAsync.IsCompleted);
FlushResult flushResult = flushAsync.GetAwaiter().GetResult();
Assert.True(flushResult.IsCanceled);
await writableBuffer.FlushAsync();
}
}