diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs
index d847c2aca6..50b2aebae7 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs
@@ -7,7 +7,6 @@ using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
-using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
@@ -88,11 +87,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
}
- public void OnConnectionClosed()
+ public async void OnConnectionClosed()
{
- _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId);
Log.ConnectionStop(ConnectionId);
KestrelEventSource.Log.ConnectionStop(this);
+
+ // The connection is already in the "aborted" state by this point, but we want to track it
+ // until RequestProcessingAsync completes for graceful shutdown.
+ await StopAsync();
+
+ _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId);
}
public async Task StopAsync()
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs
index abf4bddf7d..58180c0f69 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs
@@ -253,7 +253,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
void IHttpRequestLifetimeFeature.Abort()
{
- Abort();
+ Abort(error: null);
}
}
}
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs
index 9dc9106e48..775fed6858 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs
@@ -451,7 +451,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
///
/// Immediate kill the connection and poison the request and response streams.
///
- public void Abort(Exception error = null)
+ public void Abort(Exception error)
{
if (Interlocked.Exchange(ref _requestAborted, 1) == 0)
{
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs
index 261db5c1d7..1ced4a5b25 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs
@@ -31,7 +31,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// this is temporary until it does
private TaskCompletionSource _flushTcs;
private readonly object _flushLock = new object();
- private readonly Action _onFlushCallback;
public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log)
{
@@ -39,7 +38,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_frame = frame;
_connectionId = connectionId;
_log = log;
- _onFlushCallback = OnFlush;
}
public Task WriteAsync(
@@ -87,6 +85,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
var awaitable = writableBuffer.FlushAsync();
if (awaitable.IsCompleted)
{
+ AbortIfNeeded(awaitable);
+
// The flush task can't fail today
return TaskCache.CompletedTask;
}
@@ -105,16 +105,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
_flushTcs = new TaskCompletionSource();
- awaitable.OnCompleted(_onFlushCallback);
+ awaitable.OnCompleted(() =>
+ {
+ AbortIfNeeded(awaitable);
+ _flushTcs.TrySetResult(null);
+ });
}
}
return _flushTcs.Task;
}
- private void OnFlush()
+ private void AbortIfNeeded(WritableBufferAwaitable awaitable)
{
- _flushTcs.TrySetResult(null);
+ try
+ {
+ awaitable.GetResult();
+ }
+ catch (Exception ex)
+ {
+ _frame.Abort(ex);
+ }
}
void ISocketOutput.Write(ArraySegment buffer, bool chunk)
@@ -126,7 +137,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
if (cancellationToken.IsCancellationRequested)
{
- _frame.Abort();
+ _frame.Abort(error: null);
_cancelled = true;
return Task.FromCanceled(cancellationToken);
}
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs
index 483a4fd050..a6d7b6ac82 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs
@@ -56,7 +56,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private IConnectionHandler ConnectionHandler => ListenerContext.TransportContext.ConnectionHandler;
private LibuvThread Thread => ListenerContext.Thread;
- public void Start()
+ public async void Start()
{
try
{
@@ -64,45 +64,54 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
ConnectionId = _connectionContext.ConnectionId;
Input = _connectionContext.Input;
- Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, this, ConnectionId, Log);
+ Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, ConnectionId, Log);
// Start socket prior to applying the ConnectionAdapter
_socket.ReadStart(_allocCallback, _readCallback, this);
- // This *must* happen after socket.ReadStart
- // The socket output consumer is the only thing that can close the connection. If the
- // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards.
- var ignore = Output.StartWrites();
+ try
+ {
+ // This *must* happen after socket.ReadStart
+ // The socket output consumer is the only thing that can close the connection. If the
+ // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards.
+ await Output.WriteOutputAsync();
+ _connectionContext.Output.Complete();
+ }
+ catch (UvException ex)
+ {
+ _connectionContext.Output.Complete(ex);
+ }
+ finally
+ {
+ // Ensure the socket is disposed prior to completing in the input writer.
+ _socket.Dispose();
+ Input.Complete(new TaskCanceledException("The request was aborted"));
+ _socketClosedTcs.TrySetResult(null);
+ }
}
catch (Exception e)
{
- Log.LogError(0, e, "Connection.StartFrame");
- throw;
+ Log.LogCritical(0, e, $"{nameof(LibuvConnection)}.{nameof(Start)}() {ConnectionId}");
+ }
+ finally
+ {
+ _connectionContext.OnConnectionClosed();
}
}
- public Task StopAsync()
+ public async Task StopAsync()
{
- return Task.WhenAll(_connectionContext.StopAsync(), _socketClosedTcs.Task);
+ await _connectionContext.StopAsync();
+ await _socketClosedTcs.Task;
}
- public virtual Task AbortAsync(Exception error = null)
+ public Task AbortAsync(Exception error)
{
_connectionContext.Abort(error);
- return _socketClosedTcs.Task;
+ return StopAsync();
}
// Called on Libuv thread
- public virtual void Close()
- {
- _socket.Dispose();
-
- _connectionContext.OnConnectionClosed();
-
- Input.Complete(new TaskCanceledException("The request was aborted"));
- _socketClosedTcs.TrySetResult(null);
- }
-
private static LibuvFunctions.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
{
return ((LibuvConnection)state).OnAlloc(handle, suggestedSize);
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs
index 9cc99b919a..d52baf4765 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs
@@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private void WalkConnectionsAndAbortCore(TaskCompletionSource tcs)
{
- WalkConnectionsCore(connection => connection.AbortAsync(), tcs);
+ WalkConnectionsCore(connection => connection.AbortAsync(error: null), tcs);
}
private void WalkConnectionsCore(Func action, TaskCompletionSource tcs)
diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs
index c81cf3f9e9..4d12a2174f 100644
--- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs
+++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs
@@ -12,7 +12,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
private readonly LibuvThread _thread;
private readonly UvStreamHandle _socket;
- private readonly LibuvConnection _connection;
private readonly string _connectionId;
private readonly ILibuvTrace _log;
@@ -23,7 +22,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
IPipeReader pipe,
LibuvThread thread,
UvStreamHandle socket,
- LibuvConnection connection,
string connectionId,
ILibuvTrace log)
{
@@ -32,13 +30,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// get's scheduled
_thread = thread;
_socket = socket;
- _connection = connection;
_connectionId = connectionId;
_log = log;
_writeReqPool = thread.WriteReqPool;
}
- public async Task StartWrites()
+ public async Task WriteOutputAsync()
{
while (true)
{
@@ -53,7 +50,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
var writeResult = await writeReq.WriteAsync(_socket, buffer);
_writeReqPool.Return(writeReq);
- OnWriteCompleted(writeResult.Status, writeResult.Error);
+ LogWriteInfo(writeResult.Status, writeResult.Error);
+
+ if (writeResult.Error != null)
+ {
+ throw writeResult.Error;
+ }
}
if (result.IsCancelled)
@@ -74,27 +76,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
_pipe.Advance(result.Buffer.End);
}
}
-
- // We're done reading
- _pipe.Complete();
-
- // Close the connection
- _connection.Close();
}
- private void OnWriteCompleted(int writeStatus, Exception writeError)
+ private void LogWriteInfo(int status, Exception error)
{
- // Called inside _contextLock
- var status = writeStatus;
- var error = writeError;
-
- if (error != null)
- {
- // Abort the connection for any failed write
- // Queued on threadpool so get it in as first op.
- _connection.AbortAsync();
- }
-
if (error == null)
{
_log.ConnectionWriteCallback(_connectionId, status);
diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs
index 12bcbdfd95..8bfaa8ba48 100644
--- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs
+++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs
@@ -7,8 +7,10 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
+using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
+using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
using Microsoft.AspNetCore.Testing;
using Xunit;
@@ -316,66 +318,72 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
return 0;
};
- using (var mockConnection = new MockConnection())
+ var abortedSource = new CancellationTokenSource();
+
+ var pipeOptions = new PipeOptions
{
- var abortedSource = mockConnection.RequestAbortedSource;
+ ReaderScheduler = _libuvThread,
+ MaximumSizeHigh = maxResponseBufferSize,
+ MaximumSizeLow = maxResponseBufferSize,
+ };
- var pipeOptions = new PipeOptions
+ using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource))
+ {
+ var bufferSize = maxResponseBufferSize - 1;
+
+ var data = new byte[bufferSize];
+ var fullBuffer = new ArraySegment(data, 0, bufferSize);
+
+ // Act
+ var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
+ // task1 should complete successfully as < _maxBytesPreCompleted
+
+ // First task is completed and successful
+ Assert.True(task1Success.IsCompleted);
+ Assert.False(task1Success.IsCanceled);
+ Assert.False(task1Success.IsFaulted);
+
+ // following tasks should wait.
+ var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
+ var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
+
+ // Give time for tasks to percolate
+ await _mockLibuv.OnPostTask;
+
+ // Second task is not completed
+ Assert.False(task2Success.IsCompleted);
+ Assert.False(task2Success.IsCanceled);
+ Assert.False(task2Success.IsFaulted);
+
+ // Third task is not completed
+ Assert.False(task3Canceled.IsCompleted);
+ Assert.False(task3Canceled.IsCanceled);
+ Assert.False(task3Canceled.IsFaulted);
+
+ // Cause all writes to fail
+ while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
- ReaderScheduler = _libuvThread,
- MaximumSizeHigh = maxResponseBufferSize,
- MaximumSizeLow = maxResponseBufferSize,
- };
-
- using (var socketOutput = CreateOutputProducer(pipeOptions, mockConnection))
- {
- var bufferSize = maxResponseBufferSize - 1;
-
- var data = new byte[bufferSize];
- var fullBuffer = new ArraySegment(data, 0, bufferSize);
-
- // Act
- var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token);
- // task1 should complete successfully as < _maxBytesPreCompleted
-
- // First task is completed and successful
- Assert.True(task1Success.IsCompleted);
- Assert.False(task1Success.IsCanceled);
- Assert.False(task1Success.IsFaulted);
-
- // following tasks should wait.
- var task2Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken));
- var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token);
-
- // Give time for tasks to percolate
- await _mockLibuv.OnPostTask;
-
- // Second task is not completed
- Assert.False(task2Success.IsCompleted);
- Assert.False(task2Success.IsCanceled);
- Assert.False(task2Success.IsFaulted);
-
- // Third task is not completed
- Assert.False(task3Canceled.IsCompleted);
- Assert.False(task3Canceled.IsCanceled);
- Assert.False(task3Canceled.IsFaulted);
-
- // Cause all writes to fail
- while (completeQueue.TryDequeue(out var triggerNextCompleted))
- {
- await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted);
- }
-
- // Second task is now completed
- await task2Success.TimeoutAfter(TimeSpan.FromSeconds(5));
-
- // Third task is now canceled
- // TODO: Cancellation isn't supported right now
- // await Assert.ThrowsAsync(() => task3Canceled);
- // Assert.True(task3Canceled.IsCanceled);
-
- Assert.True(abortedSource.IsCancellationRequested);
+ await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted);
}
+
+ // Second task is now completed
+ Assert.True(task2Success.IsCompleted);
+ Assert.False(task2Success.IsCanceled);
+ Assert.False(task2Success.IsFaulted);
+
+ // A final write guarantees that the error is observed by OutputProducer,
+ // but doesn't return a canceled/faulted task.
+ var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
+ Assert.True(task4Success.IsCompleted);
+ Assert.False(task4Success.IsCanceled);
+ Assert.False(task4Success.IsFaulted);
+
+ // Third task is now canceled
+ // TODO: Cancellation isn't supported right now
+ // await Assert.ThrowsAsync(() => task3Canceled);
+ // Assert.True(task3Canceled.IsCanceled);
+
+ Assert.True(abortedSource.IsCancellationRequested);
}
}
@@ -488,22 +496,50 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
- private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, MockConnection connection = null)
+ private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, CancellationTokenSource cts = null)
{
var pipe = _pipeFactory.Create(pipeOptions);
var logger = new TestApplicationErrorLogger();
- var serviceContext = new TestServiceContext() { Log = new TestKestrelTrace(logger) };
- var transportContext = new TestLibuvTransportContext() { Log = new LibuvTrace(logger) };
+ var serviceContext = new TestServiceContext
+ {
+ Log = new TestKestrelTrace(logger),
+ ThreadPool = new InlineLoggingThreadPool(new TestKestrelTrace(logger))
+ };
+ var transportContext = new TestLibuvTransportContext { Log = new LibuvTrace(logger) };
var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext });
var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log);
var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log);
- var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, connection ?? new MockConnection(), "0", transportContext.Log);
- var ignore = consumer.StartWrites();
+ var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log);
+
+ frame.LifetimeControl = new ConnectionLifetimeControl("0", pipe.Reader, socketOutput, serviceContext.Log);
+
+ if (cts != null)
+ {
+ frame.RequestAborted.Register(cts.Cancel);
+ }
+
+ var ignore = WriteOutputAsync(consumer, pipe.Reader);
return socketOutput;
}
+
+ private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader)
+ {
+ // This WriteOutputAsync() calling code is equivalent to that in LibuvConnection.
+ try
+ {
+ // Ensure that outputReader.Complete() runs on the LibuvThread.
+ // Without ConfigureAwait(false), xunit will dispatch.
+ await consumer.WriteOutputAsync().ConfigureAwait(false);
+ outputReader.Complete();
+ }
+ catch (UvException ex)
+ {
+ outputReader.Complete(ex);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs
deleted file mode 100644
index 5e2802f976..0000000000
--- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright (c) .NET Foundation. All rights reserved.
-// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
-using Microsoft.Extensions.Internal;
-
-namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
-{
- public class MockConnection : LibuvConnection, IDisposable
- {
- private readonly TaskCompletionSource _socketClosedTcs = new TaskCompletionSource();
-
- public MockConnection()
- {
- RequestAbortedSource = new CancellationTokenSource();
- ListenerContext = new ListenerContext(new LibuvTransportContext());
- }
-
- public override Task AbortAsync(Exception error = null)
- {
- RequestAbortedSource?.Cancel();
- return TaskCache.CompletedTask;
- }
-
- public override void Close()
- {
- _socketClosedTcs.SetResult(null);
- }
-
- public CancellationTokenSource RequestAbortedSource { get; }
-
- public Task SocketClosed => _socketClosedTcs.Task;
-
- public void Dispose()
- {
- RequestAbortedSource.Dispose();
- }
- }
-}