Add cancellation support for write async (#1736)

This commit is contained in:
Pavel Krymets 2017-04-24 11:58:08 -07:00 committed by GitHub
parent 71d2abed06
commit df9e48b1f0
4 changed files with 166 additions and 20 deletions

View File

@ -12,8 +12,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public class StreamSocketOutput : ISocketOutput
{
private static readonly ArraySegment<byte> _nullBuffer = new ArraySegment<byte>(new byte[0]);
private readonly Stream _outputStream;
private readonly IPipe _pipe;
private readonly object _sync = new object();
@ -57,7 +55,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
}
}
flushAwaiter = writableBuffer.FlushAsync();
flushAwaiter = writableBuffer.FlushAsync(cancellationToken);
}
await flushAwaiter;

View File

@ -77,12 +77,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
writableBuffer.Commit();
}
return FlushAsync(writableBuffer);
return FlushAsync(writableBuffer, cancellationToken);
}
private Task FlushAsync(WritableBuffer writableBuffer)
private Task FlushAsync(WritableBuffer writableBuffer,
CancellationToken cancellationToken)
{
var awaitable = writableBuffer.FlushAsync();
var awaitable = writableBuffer.FlushAsync(cancellationToken);
if (awaitable.IsCompleted)
{
AbortIfNeeded(awaitable);
@ -90,10 +91,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// The flush task can't fail today
return TaskCache.CompletedTask;
}
return FlushAsyncAwaited(awaitable);
return FlushAsyncAwaited(awaitable, cancellationToken);
}
private Task FlushAsyncAwaited(WritableBufferAwaitable awaitable)
private async Task FlushAsyncAwaited(WritableBufferAwaitable awaitable, CancellationToken cancellationToken)
{
// https://github.com/dotnet/corefxlab/issues/1334
// Since the flush awaitable doesn't currently support multiple awaiters
@ -112,8 +113,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
});
}
}
return _flushTcs.Task;
await _flushTcs.Task;
cancellationToken.ThrowIfCancellationRequested();
}
private void AbortIfNeeded(WritableBufferAwaitable awaitable)

View File

@ -38,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
var result = await _pipe.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;
try
{
@ -53,6 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
if (writeResult.Error != null)
{
consumed = buffer.Start;
throw writeResult.Error;
}
}
@ -87,7 +89,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
finally
{
_pipe.Advance(result.Buffer.End);
_pipe.Advance(consumed);
}
}
}

View File

@ -208,7 +208,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var bufferSize = maxResponseBufferSize - 1;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
// Act
// Act
var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken));
// Assert
@ -219,7 +219,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken));
await _mockLibuv.OnPostTask;
// Assert
// Assert
// Too many bytes are already pre-completed for the second write to pre-complete.
Assert.False(writeTask2.IsCompleted);
@ -270,7 +270,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var data = new byte[bufferSize];
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
// Act
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
// Assert
@ -337,7 +337,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
@ -358,7 +358,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
Assert.False(task2Success.IsCanceled);
Assert.False(task2Success.IsFaulted);
// Third task is not completed
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
@ -382,14 +382,159 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
// TODO: Cancellation isn't supported right now
// await Assert.ThrowsAsync<TaskCanceledException>(() => task3Canceled);
// Assert.True(task3Canceled.IsCanceled);
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task CancelsBeforeWriteRequestCompletes(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize - 1;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
// First task is completed and successful
Assert.True(task1Success.IsCompleted);
Assert.False(task1Success.IsCanceled);
Assert.False(task1Success.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WriteAsyncWithTokenAfterCallWithoutIsCancelled(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
// First task is not completed
Assert.False(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
// First task is completed
Assert.True(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
}
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WritesDontGetCompletedTooQuickly(int maxResponseBufferSize)
@ -436,7 +581,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var timeout = TimeSpan.FromSeconds(5);
// Assert
// Assert
// Too many bytes are already pre-completed for the third but not the second write to pre-complete.
// https://github.com/aspnet/KestrelHttpServer/issues/356
await writeTask2.TimeoutAfter(timeout);