diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs index 519d4440af..1e99ef698b 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -15,6 +15,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter private readonly Stream _filteredStream; private readonly Stream _socketInputStream; private readonly IKestrelTrace _log; + private readonly MemoryPool2 _memory; + private MemoryPoolBlock2 _block; public FilteredStreamAdapter( Stream filteredStream, @@ -28,24 +30,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter _log = logger; _filteredStream = filteredStream; _socketInputStream = new SocketInputStream(SocketInput); - - var block = memory.Lease(); - // Use pooled block for copy - _filteredStream.CopyToAsync(_socketInputStream, block).ContinueWith((task, state) => - { - var returnedBlock = task.Result; - returnedBlock.Pool.Return(returnedBlock); - - ((FilteredStreamAdapter)state).OnStreamClose(task); - }, this); + _memory = memory; } public SocketInput SocketInput { get; private set; } public ISocketOutput SocketOutput { get; private set; } + public Task ReadInputAsync() + { + _block = _memory.Lease(); + // Use pooled block for copy + return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) => + { + ((FilteredStreamAdapter)state).OnStreamClose(task); + }, this); + } + private void OnStreamClose(Task copyAsyncTask) { + _memory.Return(_block); + if (copyAsyncTask.IsFaulted) { SocketInput.AbortAwaiting(); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamExtensions.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamExtensions.cs index 72c3fb9fd9..ccf0a47384 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamExtensions.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamExtensions.cs @@ -9,15 +9,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter { public static class StreamExtensions { - public static async Task CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block) + public static async Task CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block) { int bytesRead; while ((bytesRead = await source.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0) { await destination.WriteAsync(block.Array, block.Data.Offset, bytesRead); } - - return block; } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs index 76282b9239..6f4d79670e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs @@ -4,6 +4,7 @@ using System; using System.Net; using System.Threading; +using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Filter; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Networking; @@ -31,6 +32,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private readonly object _stateLock = new object(); private ConnectionState _connectionState; + private TaskCompletionSource _socketClosedTcs; + private Task _readFilteredInputTask = TaskUtilities.CompletedTask; private IPEndPoint _remoteEndPoint; private IPEndPoint _localEndPoint; @@ -44,6 +47,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http _rawSocketInput = new SocketInput(Memory2, ThreadPool); _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); + + ConnectionManager.AddConnection(_connectionId, this); + } + + // Internal for testing + internal Connection() + { } public void Start() @@ -63,11 +73,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // Don't initialize _frame until SocketInput and SocketOutput are set to their final values. if (ServerInformation.ConnectionFilter == null) { - SocketInput = _rawSocketInput; - SocketOutput = _rawSocketOutput; + lock (_stateLock) + { + if (_connectionState != ConnectionState.CreatingFrame) + { + throw new InvalidOperationException("Invalid connection state: " + _connectionState); + } - _frame = CreateFrame(); - _frame.Start(); + _connectionState = ConnectionState.Open; + + SocketInput = _rawSocketInput; + SocketOutput = _rawSocketOutput; + + _frame = CreateFrame(); + _frame.Start(); + } } else { @@ -109,21 +129,51 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - public virtual void Abort() + public Task StopAsync() { - if (_frame != null) + lock (_stateLock) { - // Frame.Abort calls user code while this method is always - // called from a libuv thread. - System.Threading.ThreadPool.QueueUserWorkItem(state => + switch (_connectionState) { - var connection = (Connection)state; - connection._frame.Abort(); - }, this); + case ConnectionState.SocketClosed: + return _readFilteredInputTask; + case ConnectionState.CreatingFrame: + _connectionState = ConnectionState.ToDisconnect; + break; + case ConnectionState.Open: + _frame.Stop(); + SocketInput.CompleteAwaiting(); + break; + } + + _socketClosedTcs = new TaskCompletionSource(); + return Task.WhenAll(_socketClosedTcs.Task, _readFilteredInputTask); } } - public void OnSocketClosed() + public virtual void Abort() + { + lock (_stateLock) + { + if (_connectionState == ConnectionState.CreatingFrame) + { + _connectionState = ConnectionState.ToDisconnect; + } + else + { + // Frame.Abort calls user code while this method is always + // called from a libuv thread. + System.Threading.ThreadPool.QueueUserWorkItem(state => + { + var connection = (Connection)state; + connection._frame.Abort(); + }, this); + } + } + } + + // Called on Libuv thread + public virtual void OnSocketClosed() { _rawSocketInput.Dispose(); @@ -133,25 +183,64 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { SocketInput.Dispose(); } + + lock (_stateLock) + { + _connectionState = ConnectionState.SocketClosed; + + if (_socketClosedTcs != null) + { + // This is always waited on synchronously, so it's safe to + // call on the libuv thread. + _socketClosedTcs.TrySetResult(null); + } + + if (_readFilteredInputTask.IsCompleted) + { + ConnectionManager.ConnectionStopped(_connectionId); + } + else + { + _readFilteredInputTask.ContinueWith((t, state) => + { + var connection = (Connection)state; + connection.ConnectionManager.ConnectionStopped(connection._connectionId); + }, this); + } + } } private void ApplyConnectionFilter() { - if (_filterContext.Connection != _libuvStream) + lock (_stateLock) { - var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool); + if (_connectionState == ConnectionState.CreatingFrame) + { + _connectionState = ConnectionState.Open; - SocketInput = filteredStreamAdapter.SocketInput; - SocketOutput = filteredStreamAdapter.SocketOutput; - } - else - { - SocketInput = _rawSocketInput; - SocketOutput = _rawSocketOutput; - } + if (_filterContext.Connection != _libuvStream) + { + var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool); - _frame = CreateFrame(); - _frame.Start(); + SocketInput = filteredStreamAdapter.SocketInput; + SocketOutput = filteredStreamAdapter.SocketOutput; + + _readFilteredInputTask = filteredStreamAdapter.ReadInputAsync(); + } + else + { + SocketInput = _rawSocketInput; + SocketOutput = _rawSocketOutput; + } + + _frame = CreateFrame(); + _frame.Start(); + } + else + { + ConnectionControl.End(ProduceEndType.SocketDisconnect); + } + } } private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state) @@ -227,16 +316,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { switch (endType) { - case ProduceEndType.SocketShutdownSend: - if (_connectionState != ConnectionState.Open) - { - return; - } - _connectionState = ConnectionState.Shutdown; - - Log.ConnectionWriteFin(_connectionId); - _rawSocketOutput.End(endType); - break; case ProduceEndType.ConnectionKeepAlive: if (_connectionState != ConnectionState.Open) { @@ -245,12 +324,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http Log.ConnectionKeepAlive(_connectionId); break; + case ProduceEndType.SocketShutdown: case ProduceEndType.SocketDisconnect: - if (_connectionState == ConnectionState.Disconnected) + if (_connectionState == ConnectionState.Disconnecting || + _connectionState == ConnectionState.SocketClosed) { return; } - _connectionState = ConnectionState.Disconnected; + _connectionState = ConnectionState.Disconnecting; Log.ConnectionDisconnect(_connectionId); _rawSocketOutput.End(endType); @@ -261,9 +342,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private enum ConnectionState { + CreatingFrame, + ToDisconnect, Open, - Shutdown, - Disconnected + Disconnecting, + SocketClosed } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs index 68fd28bfeb..1c54363146 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs @@ -145,20 +145,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // If _requestAborted is set, the connection has already been closed. if (Volatile.Read(ref _requestAborted) == 0) { - try - { - // Inform client no more data will ever arrive - ConnectionControl.End(ProduceEndType.SocketShutdownSend); - - // Wait for client to either disconnect or send unexpected data - await SocketInput; - } - finally - { - // Ensure we *always* disconnect the socket. - // Dispose socket - ConnectionControl.End(ProduceEndType.SocketDisconnect); - } + ConnectionControl.End(ProduceEndType.SocketShutdown); } } catch (Exception ex) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/IAsyncDisposable.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/IAsyncDisposable.cs new file mode 100644 index 0000000000..e2fb35aef6 --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/IAsyncDisposable.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Http +{ + interface IAsyncDisposable + { + Task DisposeAsync(); + } +} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs index d43f54e58a..818807a203 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http /// /// Base class for listeners in Kestrel. Listens for incoming connections /// - public abstract class Listener : ListenerContext, IDisposable + public abstract class Listener : ListenerContext, IAsyncDisposable { protected Listener(ServiceContext serviceContext) : base(serviceContext) @@ -77,7 +77,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http connection.Start(); } - public void Dispose() + public virtual async Task DisposeAsync() { // Ensure the event loop is still running. // If the event loop isn't running and we try to wait on this Post @@ -85,34 +85,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // the exception that stopped the event loop will never be surfaced. if (Thread.FatalError == null && ListenSocket != null) { - var tcs = new TaskCompletionSource(this); - Thread.Post( - tcs2 => + await Thread.PostAsync(state => + { + var listener = (Listener)state; + listener.ListenSocket.Dispose(); + }, this); + + await ConnectionManager.CloseConnectionsAsync(); + + await Thread.PostAsync(state => + { + var listener = (Listener)state; + var writeReqPool = listener.WriteReqPool; + while (writeReqPool.Count > 0) { - try - { - var socket = (Listener)tcs2.Task.AsyncState; - socket.ListenSocket.Dispose(); - - var writeReqPool = socket.WriteReqPool; - while (writeReqPool.Count > 0) - { - writeReqPool.Dequeue().Dispose(); - } - - tcs2.SetResult(0); - } - catch (Exception ex) - { - tcs2.SetException(ex); - } - }, - tcs); - - // REVIEW: Should we add a timeout here to be safe? - tcs.Task.Wait(); + writeReqPool.Dequeue().Dispose(); + } + }, this); } + Memory2.Dispose(); ListenSocket = null; } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs index 692759e7d8..1b8e090aa2 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs @@ -11,13 +11,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { public ListenerContext() { - Memory2 = new MemoryPool2(); } public ListenerContext(ServiceContext serviceContext) : base(serviceContext) { Memory2 = new MemoryPool2(); + ConnectionManager = new ConnectionManager(); WriteReqPool = new Queue(SocketOutput.MaxPooledWriteReqs); } @@ -27,6 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Memory2 = listenerContext.Memory2; + ConnectionManager = listenerContext.ConnectionManager; WriteReqPool = listenerContext.WriteReqPool; Log = listenerContext.Log; } @@ -37,6 +38,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public MemoryPool2 Memory2 { get; set; } + public ConnectionManager ConnectionManager { get; set; } + public Queue WriteReqPool { get; set; } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerPrimary.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerPrimary.cs index 58f46e14b8..30b27342d7 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerPrimary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerPrimary.cs @@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http await StartAsync(address, thread).ConfigureAwait(false); - await Thread.PostAsync(_this => _this.PostCallback(), - this).ConfigureAwait(false); + await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(), + this).ConfigureAwait(false); } private void PostCallback() @@ -100,5 +100,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http socket); } } + + public override async Task DisposeAsync() + { + // Call base first so the ListenSocket gets closed and doesn't + // try to dispatch connections to closed pipes. + await base.DisposeAsync(); + + if (Thread.FatalError == null && ListenPipe != null) + { + await Thread.PostAsync(state => + { + var listener = (ListenerPrimary)state; + listener.ListenPipe.Dispose(); + + foreach (var dispatchPipe in listener._dispatchPipes) + { + dispatchPipe.Dispose(); + } + }, this); + } + } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs index 18d69c1dd5..a096dea1a1 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs @@ -5,7 +5,6 @@ using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Networking; using Microsoft.Extensions.Logging; @@ -16,7 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http /// A secondary listener is delegated requests from a primary listener via a named pipe or /// UNIX domain socket. /// - public abstract class ListenerSecondary : ListenerContext, IDisposable + public abstract class ListenerSecondary : ListenerContext, IAsyncDisposable { private string _pipeName; private IntPtr _ptr; @@ -155,7 +154,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - public void Dispose() + public async Task DisposeAsync() { // Ensure the event loop is still running. // If the event loop isn't running and we try to wait on this Post @@ -163,16 +162,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // the exception that stopped the event loop will never be surfaced. if (Thread.FatalError == null) { - Thread.Send(listener => + await Thread.PostAsync(state => { + var listener = (ListenerSecondary)state; listener.DispatchPipe.Dispose(); listener.FreeBuffer(); }, this); + + await ConnectionManager.CloseConnectionsAsync(); + + await Thread.PostAsync(state => + { + var listener = (ListenerSecondary)state; + var writeReqPool = listener.WriteReqPool; + while (writeReqPool.Count > 0) + { + writeReqPool.Dequeue().Dispose(); + } + }, this); } else { FreeBuffer(); } + + Memory2.Dispose(); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ProduceEndType.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ProduceEndType.cs index 8e4831ffa2..4099e1c864 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ProduceEndType.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ProduceEndType.cs @@ -5,7 +5,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { public enum ProduceEndType { - SocketShutdownSend, + SocketShutdown, SocketDisconnect, ConnectionKeepAlive, } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs index 2a3ac3d698..d5803f0698 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs @@ -183,6 +183,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } + public void CompleteAwaiting() + { + Complete(); + } + public void AbortAwaiting() { _awaitableError = new TaskCanceledException("The request was aborted"); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs index 406d0b756a..bab2689334 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs @@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private const int _initialTaskQueues = 64; private const int _maxPooledWriteContexts = 32; - private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); private static readonly Action _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered(); private readonly KestrelThread _thread; @@ -205,17 +204,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { switch (endType) { - case ProduceEndType.SocketShutdownSend: + case ProduceEndType.SocketShutdown: WriteAsync(default(ArraySegment), default(CancellationToken), socketShutdownSend: true, - socketDisconnect: false); + socketDisconnect: true, + isSync: true); break; case ProduceEndType.SocketDisconnect: WriteAsync(default(ArraySegment), default(CancellationToken), socketShutdownSend: false, - socketDisconnect: true); + socketDisconnect: true, + isSync: true); break; } } @@ -256,6 +257,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { MemoryPoolBlock2 blockToReturn = null; + lock (_returnLock) { Debug.Assert(!_lastStart.IsDefault); @@ -277,7 +279,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http if (blockToReturn != null) { - ThreadPool.QueueUserWorkItem(_returnBlocks, blockToReturn); + ReturnBlocks(blockToReturn); } } @@ -593,6 +595,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http return; } + Self._log.ConnectionWriteFin(Self._connectionId); + var shutdownReq = new UvShutdownReq(Self._log); shutdownReq.Init(Self._thread.Loop); shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) => @@ -618,9 +622,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http return; } + // Ensure all blocks are returned before calling OnSocketClosed + // to ensure the MemoryPool doesn't get disposed too soon. + Self.ReturnAllBlocks(); Self._socket.Dispose(); Self._connection.OnSocketClosed(); - Self.ReturnAllBlocks(); Self._log.ConnectionStop(Self._connectionId); CompleteWithContextLock(); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/IKestrelServerInformation.cs b/src/Microsoft.AspNetCore.Server.Kestrel/IKestrelServerInformation.cs index fa81591879..b2ec185075 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/IKestrelServerInformation.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/IKestrelServerInformation.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using Microsoft.AspNetCore.Server.Kestrel.Filter; namespace Microsoft.AspNetCore.Server.Kestrel @@ -9,6 +10,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel { int ThreadCount { get; set; } + /// + /// The amount of time after the server begins shutting down before connections will be forcefully closed. + /// By default, Kestrel will wait 5 seconds for any ongoing requests to complete before terminating + /// the connection. + /// A custom timeout can be configured using the "kestrel.shutdownTimeout" key in . + /// The value will be parsed as a float representing the timout in seconds. + /// + TimeSpan ShutdownTimeout { get; set; } + bool NoDelay { get; set; } /// diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs new file mode 100644 index 0000000000..c4a64c561c --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs @@ -0,0 +1,59 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Http; + +namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure +{ + public class ConnectionManager + { + private bool _managerClosed; + private ConcurrentDictionary _activeConnections = new ConcurrentDictionary(); + + public void AddConnection(long connectionId, Connection connection) + { + if (_managerClosed) + { + throw new InvalidOperationException(nameof(ConnectionManager) + " closed."); + } + + if (!_activeConnections.TryAdd(connectionId, connection)) + { + throw new InvalidOperationException("Connection already added."); + } + } + + public void ConnectionStopped(long connectionId) + { + Connection removed; + _activeConnections.TryRemove(connectionId, out removed); + } + + public Task CloseConnectionsAsync() + { + if (_managerClosed) + { + throw new InvalidOperationException(nameof(ConnectionManager) + " already closed."); + } + + _managerClosed = true; + + var stopTasks = new List(); + + foreach (var connectionId in _activeConnections.Keys) + { + Connection removed; + if (_activeConnections.TryRemove(connectionId, out removed)) + { + stopTasks.Add(removed.StopAsync()); + } + } + + return Task.WhenAll(stopTasks); + } + } +} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/IKestrelTrace.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/IKestrelTrace.cs index 0ed3c6565a..26af1717c2 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/IKestrelTrace.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/IKestrelTrace.cs @@ -33,6 +33,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure void ConnectionDisconnectedWrite(long connectionId, int count, Exception ex); + void NotAllConnectionsClosedGracefully(); + void ApplicationError(Exception ex); } } \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs index bd9ac3020c..7933e11e84 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -27,8 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel private static readonly Action _threadCallbackAdapter = (callback, state) => ((Action)callback).Invoke((KestrelThread)state); private static readonly Action _socketCallbackAdapter = (callback, state) => ((Action)callback).Invoke((SocketOutput)state); private static readonly Action _tcsCallbackAdapter = (callback, state) => ((Action>)callback).Invoke((TaskCompletionSource)state); - private static readonly Action _listenerPrimaryCallbackAdapter = (callback, state) => ((Action)callback).Invoke((ListenerPrimary)state); - private static readonly Action _listenerSecondaryCallbackAdapter = (callback, state) => ((Action)callback).Invoke((ListenerSecondary)state); + private static readonly Action _postAsyncCallbackAdapter = (callback, state) => ((Action)callback).Invoke(state); private readonly KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; @@ -78,23 +77,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel return; } - var stepTimeout = (int)(timeout.TotalMilliseconds / 3); + var stepTimeout = (int)(timeout.TotalMilliseconds / 2); Post(t => t.OnStop()); if (!_thread.Join(stepTimeout)) { try { - Post(t => t.OnStopRude()); + Post(t => t.OnStopImmediate()); if (!_thread.Join(stepTimeout)) { - Post(t => t.OnStopImmediate()); - if (!_thread.Join(stepTimeout)) - { #if NET451 - _thread.Abort(); + _thread.Abort(); #endif - } } } catch (ObjectDisposedException) @@ -118,11 +113,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel private void OnStop() { - _post.Unreference(); - } - - private void OnStopRude() - { + // If the listeners were all disposed gracefully there should be no handles + // left to dispose other than _post. + // We dispose everything here in the event they are not closed gracefully. _engine.Libuv.walk( _loop, (ptr, arg) => @@ -134,6 +127,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel } }, IntPtr.Zero); + + _post.Unreference(); } private void OnStopImmediate() @@ -179,14 +174,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel _post.Send(); } - public Task PostAsync(Action callback, ListenerPrimary state) + public Task PostAsync(Action callback, object state) { var tcs = new TaskCompletionSource(); lock (_workSync) { _workAdding.Enqueue(new Work { - CallbackAdapter = _listenerPrimaryCallbackAdapter, + CallbackAdapter = _postAsyncCallbackAdapter, Callback = callback, State = state, Completion = tcs @@ -196,35 +191,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel return tcs.Task; } - public Task PostAsync(Action callback, ListenerSecondary state) - { - var tcs = new TaskCompletionSource(); - lock (_workSync) - { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _listenerSecondaryCallbackAdapter, - Callback = callback, - State = state, - Completion = tcs - }); - } - _post.Send(); - return tcs.Task; - } - - public void Send(Action callback, ListenerSecondary state) - { - if (_loop.ThreadId == Thread.CurrentThread.ManagedThreadId) - { - callback.Invoke(state); - } - else - { - PostAsync(callback, state).Wait(); - } - } - private void PostCloseHandle(Action callback, IntPtr handle) { lock (_workSync) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelTrace.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelTrace.cs index 9be3819e8b..0edcd183fb 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelTrace.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelTrace.cs @@ -23,6 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel private static readonly Action _connectionDisconnect; private static readonly Action _connectionError; private static readonly Action _connectionDisconnectedWrite; + private static readonly Action _notAllConnectionsClosedGracefully; protected readonly ILogger _logger; @@ -37,12 +38,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel _connectionWriteFin = LoggerMessage.Define(LogLevel.Debug, 7, @"Connection id ""{ConnectionId}"" sending FIN."); _connectionWroteFin = LoggerMessage.Define(LogLevel.Debug, 8, @"Connection id ""{ConnectionId}"" sent FIN with status ""{Status}""."); _connectionKeepAlive = LoggerMessage.Define(LogLevel.Debug, 9, @"Connection id ""{ConnectionId}"" completed keep alive response."); - _connectionDisconnect = LoggerMessage.Define(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnected."); + _connectionDisconnect = LoggerMessage.Define(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnecting."); // ConnectionWrite: Reserved: 11 // ConnectionWriteCallback: Reserved: 12 // ApplicationError: Reserved: 13 - LoggerMessage.Define overload not present _connectionError = LoggerMessage.Define(LogLevel.Information, 14, @"Connection id ""{ConnectionId}"" communication error"); _connectionDisconnectedWrite = LoggerMessage.Define(LogLevel.Debug, 15, @"Connection id ""{ConnectionId}"" write of ""{count}"" bytes to disconnected client."); + _notAllConnectionsClosedGracefully = LoggerMessage.Define(LogLevel.Debug, 16, "Some connections failed to close gracefully during server shutdown."); } public KestrelTrace(ILogger logger) @@ -128,6 +130,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel _connectionDisconnectedWrite(_logger, connectionId, count, ex); } + public virtual void NotAllConnectionsClosedGracefully() + { + _notAllConnectionsClosedGracefully(_logger, null); + } + public virtual void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) { _logger.Log(logLevel, eventId, state, exception, formatter); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelEngine.cs index 40c3359558..ae4f84735c 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelEngine.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelEngine.cs @@ -3,7 +3,7 @@ using System; using System.Collections.Generic; -using Microsoft.AspNetCore.Http; +using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Http; using Microsoft.AspNetCore.Server.Kestrel.Networking; @@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel public IDisposable CreateServer(ServerAddress address) { - var listeners = new List(); + var listeners = new List(); var usingPipes = address.IsUnixPipe; @@ -91,23 +91,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel first = false; } + return new Disposable(() => { - foreach (var listener in listeners) - { - listener.Dispose(); - } + DisposeListeners(listeners); }); } catch { - foreach (var listener in listeners) - { - listener.Dispose(); - } + DisposeListeners(listeners); throw; } } + + private void DisposeListeners(List listeners) + { + var disposeTasks = new List(); + + foreach (var listener in listeners) + { + disposeTasks.Add(listener.DisposeAsync()); + } + + if (!Task.WhenAll(disposeTasks).Wait(ServerInformation.ShutdownTimeout)) + { + Log.NotAllConnectionsClosedGracefully(); + } + } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServer.cs b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServer.cs index b6768de672..ead9f9b2da 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServer.cs @@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel var trace = new KestrelTrace(_logger); var engine = new KestrelEngine(new ServiceContext { - FrameFactory = (context, remoteEP, localEP, prepareRequest) => + FrameFactory = (context, remoteEP, localEP, prepareRequest) => { return new Frame(application, context, remoteEP, localEP, prepareRequest); }, diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServerInformation.cs b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServerInformation.cs index b1d4d335ab..4e2f4b0515 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServerInformation.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/KestrelServerInformation.cs @@ -21,6 +21,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel Addresses = GetAddresses(configuration); ThreadCount = GetThreadCount(configuration); + ShutdownTimeout = GetShutdownTimeout(configuration); NoDelay = GetNoDelay(configuration); PoolingParameters = new KestrelServerPoolingParameters(configuration); } @@ -29,6 +30,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel public int ThreadCount { get; set; } + public TimeSpan ShutdownTimeout { get; set; } + public bool NoDelay { get; set; } public KestrelServerPoolingParameters PoolingParameters { get; } @@ -93,6 +96,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel return ProcessorThreadCount; } + private TimeSpan GetShutdownTimeout(IConfiguration configuration) + { + var shutdownTimeoutString = configuration["kestrel.shutdownTimout"]; + + float shutdownTimeout; + if (float.TryParse(shutdownTimeoutString, NumberStyles.Float, CultureInfo.InvariantCulture, out shutdownTimeout)) + { + return TimeSpan.FromSeconds(shutdownTimeout); + } + + return TimeSpan.FromSeconds(5); + } + private static bool GetNoDelay(IConfiguration configuration) { var noDelayString = configuration["kestrel.noDelay"]; diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/TestServiceContext.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/TestServiceContext.cs index 0ac9c5c76d..743044c2eb 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/TestServiceContext.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/TestServiceContext.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Server.Kestrel; using Microsoft.AspNetCore.Server.Kestrel.Filter; @@ -23,11 +24,13 @@ namespace Microsoft.AspNetCore.Server.KestrelTests var configuration = new ConfigurationBuilder().Build(); ServerInformation = new KestrelServerInformation(configuration); + ServerInformation.ShutdownTimeout = TimeSpan.FromSeconds(5); + HttpComponentFactory = new HttpComponentFactory(ServerInformation); } public TestServiceContext(IConnectionFilter filter) - : base() + : this() { ServerInformation.ConnectionFilter = filter; }