From df9e48b1f07a30d7b491107bab6b4483f097295e Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 24 Apr 2017 11:58:08 -0700 Subject: [PATCH] Add cancellation support for write async (#1736) --- .../Adapter/Internal/StreamSocketOutput.cs | 4 +- .../Internal/Http/OutputProducer.cs | 15 +- .../Internal/LibuvOutputConsumer.cs | 4 +- .../LibuvOutputConsumerTests.cs | 163 +++++++++++++++++- 4 files changed, 166 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs index 926f0d1cc1..6055f271f5 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs @@ -12,8 +12,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { public class StreamSocketOutput : ISocketOutput { - private static readonly ArraySegment _nullBuffer = new ArraySegment(new byte[0]); - private readonly Stream _outputStream; private readonly IPipe _pipe; private readonly object _sync = new object(); @@ -57,7 +55,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal } } - flushAwaiter = writableBuffer.FlushAsync(); + flushAwaiter = writableBuffer.FlushAsync(cancellationToken); } await flushAwaiter; diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index 1ced4a5b25..d8695f6efb 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -77,12 +77,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http writableBuffer.Commit(); } - return FlushAsync(writableBuffer); + return FlushAsync(writableBuffer, cancellationToken); } - private Task FlushAsync(WritableBuffer writableBuffer) + private Task FlushAsync(WritableBuffer writableBuffer, + CancellationToken cancellationToken) { - var awaitable = writableBuffer.FlushAsync(); + var awaitable = writableBuffer.FlushAsync(cancellationToken); if (awaitable.IsCompleted) { AbortIfNeeded(awaitable); @@ -90,10 +91,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // The flush task can't fail today return TaskCache.CompletedTask; } - return FlushAsyncAwaited(awaitable); + return FlushAsyncAwaited(awaitable, cancellationToken); } - private Task FlushAsyncAwaited(WritableBufferAwaitable awaitable) + private async Task FlushAsyncAwaited(WritableBufferAwaitable awaitable, CancellationToken cancellationToken) { // https://github.com/dotnet/corefxlab/issues/1334 // Since the flush awaitable doesn't currently support multiple awaiters @@ -112,8 +113,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http }); } } - - return _flushTcs.Task; + await _flushTcs.Task; + cancellationToken.ThrowIfCancellationRequested(); } private void AbortIfNeeded(WritableBufferAwaitable awaitable) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index 8b62da54ed..54950c23f0 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -38,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { var result = await _pipe.ReadAsync(); var buffer = result.Buffer; + var consumed = buffer.End; try { @@ -53,6 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal if (writeResult.Error != null) { + consumed = buffer.Start; throw writeResult.Error; } } @@ -87,7 +89,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } finally { - _pipe.Advance(result.Buffer.End); + _pipe.Advance(consumed); } } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index 15cfc81453..eb99d26fb2 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -208,7 +208,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var bufferSize = maxResponseBufferSize - 1; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); - // Act + // Act var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); // Assert @@ -219,7 +219,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); await _mockLibuv.OnPostTask; - // Assert + // Assert // Too many bytes are already pre-completed for the second write to pre-complete. Assert.False(writeTask2.IsCompleted); @@ -270,7 +270,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var data = new byte[bufferSize]; var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); - // Act + // Act var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); // Assert @@ -337,7 +337,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var data = new byte[bufferSize]; var fullBuffer = new ArraySegment(data, 0, bufferSize); - // Act + // Act var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted @@ -358,7 +358,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task2Success.IsCanceled); Assert.False(task2Success.IsFaulted); - // Third task is not completed + // Third task is not completed Assert.False(task3Canceled.IsCompleted); Assert.False(task3Canceled.IsCanceled); Assert.False(task3Canceled.IsFaulted); @@ -382,14 +382,159 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task4Success.IsFaulted); // Third task is now canceled - // TODO: Cancellation isn't supported right now - // await Assert.ThrowsAsync(() => task3Canceled); - // Assert.True(task3Canceled.IsCanceled); + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); Assert.True(abortedSource.IsCancellationRequested); } } + [Theory] + [MemberData(nameof(PositiveMaxResponseBufferSizeData))] + public async Task CancelsBeforeWriteRequestCompletes(int maxResponseBufferSize) + { + var completeQueue = new ConcurrentQueue>(); + + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + { + completeQueue.Enqueue(triggerCompleted); + return 0; + }; + + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions + { + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions)) + { + var bufferSize = maxResponseBufferSize - 1; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + // task1 should complete successfully as < _maxBytesPreCompleted + + // First task is completed and successful + Assert.True(task1Success.IsCompleted); + Assert.False(task1Success.IsCanceled); + Assert.False(task1Success.IsFaulted); + + // following tasks should wait. + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + abortedSource.Cancel(); + + // Complete writes + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); + + Assert.True(abortedSource.IsCancellationRequested); + } + } + + [Theory] + [MemberData(nameof(PositiveMaxResponseBufferSizeData))] + public async Task WriteAsyncWithTokenAfterCallWithoutIsCancelled(int maxResponseBufferSize) + { + var completeQueue = new ConcurrentQueue>(); + + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + { + completeQueue.Enqueue(triggerCompleted); + return 0; + }; + + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions + { + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions)) + { + var bufferSize = maxResponseBufferSize; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + + // First task is not completed + Assert.False(task1Waits.IsCompleted); + Assert.False(task1Waits.IsCanceled); + Assert.False(task1Waits.IsFaulted); + + // following tasks should wait. + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + abortedSource.Cancel(); + + // Complete writes + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } + + // First task is completed + Assert.True(task1Waits.IsCompleted); + Assert.False(task1Waits.IsCanceled); + Assert.False(task1Waits.IsFaulted); + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); + } + } + [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WritesDontGetCompletedTooQuickly(int maxResponseBufferSize) @@ -436,7 +581,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var timeout = TimeSpan.FromSeconds(5); - // Assert + // Assert // Too many bytes are already pre-completed for the third but not the second write to pre-complete. // https://github.com/aspnet/KestrelHttpServer/issues/356 await writeTask2.TimeoutAfter(timeout);