diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs index 1e99ef698b..d93888ffb5 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -37,11 +37,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter public ISocketOutput SocketOutput { get; private set; } - public Task ReadInputAsync() + public void ReadInput() { _block = _memory.Lease(); // Use pooled block for copy - return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) => + _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) => { ((FilteredStreamAdapter)state).OnStreamClose(task); }, this); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs index 6f4d79670e..d49738193e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs @@ -33,7 +33,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private readonly object _stateLock = new object(); private ConnectionState _connectionState; private TaskCompletionSource _socketClosedTcs; - private Task _readFilteredInputTask = TaskUtilities.CompletedTask; private IPEndPoint _remoteEndPoint; private IPEndPoint _localEndPoint; @@ -41,14 +40,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public Connection(ListenerContext context, UvStreamHandle socket) : base(context) { _socket = socket; + socket.Connection = this; ConnectionControl = this; _connectionId = Interlocked.Increment(ref _lastConnectionId); _rawSocketInput = new SocketInput(Memory2, ThreadPool); _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); - - ConnectionManager.AddConnection(_connectionId, this); } // Internal for testing @@ -136,7 +134,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http switch (_connectionState) { case ConnectionState.SocketClosed: - return _readFilteredInputTask; + return TaskUtilities.CompletedTask; case ConnectionState.CreatingFrame: _connectionState = ConnectionState.ToDisconnect; break; @@ -147,7 +145,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } _socketClosedTcs = new TaskCompletionSource(); - return Task.WhenAll(_socketClosedTcs.Task, _readFilteredInputTask); + return _socketClosedTcs.Task; } } @@ -194,19 +192,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // call on the libuv thread. _socketClosedTcs.TrySetResult(null); } - - if (_readFilteredInputTask.IsCompleted) - { - ConnectionManager.ConnectionStopped(_connectionId); - } - else - { - _readFilteredInputTask.ContinueWith((t, state) => - { - var connection = (Connection)state; - connection.ConnectionManager.ConnectionStopped(connection._connectionId); - }, this); - } } } @@ -225,7 +210,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http SocketInput = filteredStreamAdapter.SocketInput; SocketOutput = filteredStreamAdapter.SocketOutput; - _readFilteredInputTask = filteredStreamAdapter.ReadInputAsync(); + filteredStreamAdapter.ReadInput(); } else { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ConnectionManager.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ConnectionManager.cs new file mode 100644 index 0000000000..0d807aed1f --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ConnectionManager.cs @@ -0,0 +1,53 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Networking; + +namespace Microsoft.AspNetCore.Server.Kestrel.Http +{ + public class ConnectionManager + { + private KestrelThread _thread; + private List _connectionStopTasks; + + public ConnectionManager(KestrelThread thread) + { + _thread = thread; + } + + // This must be called on the libuv event loop + public void WalkConnectionsAndClose() + { + if (_connectionStopTasks != null) + { + throw new InvalidOperationException(nameof(WalkConnectionsAndClose) + " cannot be called twice."); + } + + _connectionStopTasks = new List(); + + _thread.Walk(ptr => + { + var handle = UvMemory.FromIntPtr(ptr); + var connection = (handle as UvStreamHandle)?.Connection; + + if (connection != null) + { + _connectionStopTasks.Add(connection.StopAsync()); + } + }); + } + + public Task WaitForConnectionCloseAsync() + { + if (_connectionStopTasks == null) + { + throw new InvalidOperationException(nameof(WalkConnectionsAndClose) + " must be called first."); + } + + return Task.WhenAll(_connectionStopTasks); + } + } +} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs index 7fee42cec3..76b64dacba 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Listener.cs @@ -13,6 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http /// public abstract class Listener : ListenerContext, IAsyncDisposable { + private bool _closed; + protected Listener(ServiceContext serviceContext) : base(serviceContext) { @@ -26,6 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { ServerAddress = address; Thread = thread; + ConnectionManager = new ConnectionManager(thread); var tcs = new TaskCompletionSource(this); @@ -55,11 +58,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http protected static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state) { var listener = (Listener) state; + if (error != null) { listener.Log.LogError(0, error, "Listener.ConnectionCallback"); } - else + else if (!listener._closed) { listener.OnConnection(stream, status); } @@ -90,14 +94,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { var listener = (Listener)state; listener.ListenSocket.Dispose(); + + listener._closed = true; + + listener.ConnectionManager.WalkConnectionsAndClose(); }, this); - await ConnectionManager.CloseConnectionsAsync(); + await ConnectionManager.WaitForConnectionCloseAsync(); await Thread.PostAsync(state => { - var listener = (Listener)state; - var writeReqPool = listener.WriteReqPool; + var writeReqPool = ((Listener)state).WriteReqPool; while (writeReqPool.Count > 0) { writeReqPool.Dequeue().Dispose(); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs index 1b8e090aa2..a012c15326 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerContext.cs @@ -17,7 +17,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http : base(serviceContext) { Memory2 = new MemoryPool2(); - ConnectionManager = new ConnectionManager(); WriteReqPool = new Queue(SocketOutput.MaxPooledWriteReqs); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs index 8d21c16dcd..10426c1cc9 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ListenerSecondary.cs @@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private string _pipeName; private IntPtr _ptr; private Libuv.uv_buf_t _buf; + private bool _closed; protected ListenerSecondary(ServiceContext serviceContext) : base(serviceContext) { @@ -38,6 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http ServerAddress = address; Thread = thread; + ConnectionManager = new ConnectionManager(thread); DispatchPipe = new UvPipeHandle(Log); @@ -118,7 +120,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http return; } - if (DispatchPipe.PendingCount() == 0) + if (_closed || DispatchPipe.PendingCount() == 0) { return; } @@ -167,9 +169,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http var listener = (ListenerSecondary)state; listener.DispatchPipe.Dispose(); listener.FreeBuffer(); + + listener._closed = true; + + listener.ConnectionManager.WalkConnectionsAndClose(); }, this); - await ConnectionManager.CloseConnectionsAsync(); + await ConnectionManager.WaitForConnectionCloseAsync(); await Thread.PostAsync(state => { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs index 4ed52a12b7..6e9e4bebf1 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs @@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private const int _initialTaskQueues = 64; private const int _maxPooledWriteContexts = 32; + private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); private static readonly Action _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered(); private readonly KestrelThread _thread; @@ -279,7 +280,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http if (blockToReturn != null) { - ReturnBlocks(blockToReturn); + ThreadPool.QueueUserWorkItem(_returnBlocks, blockToReturn); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/TcpListener.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/TcpListener.cs index fc845b5239..1b1927e27c 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/TcpListener.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/TcpListener.cs @@ -44,7 +44,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http acceptSocket.NoDelay(ServerInformation.NoDelay); listenSocket.Accept(acceptSocket); DispatchConnection(acceptSocket); - } catch (UvException ex) { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs deleted file mode 100644 index c4a64c561c..0000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/ConnectionManager.cs +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Http; - -namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure -{ - public class ConnectionManager - { - private bool _managerClosed; - private ConcurrentDictionary _activeConnections = new ConcurrentDictionary(); - - public void AddConnection(long connectionId, Connection connection) - { - if (_managerClosed) - { - throw new InvalidOperationException(nameof(ConnectionManager) + " closed."); - } - - if (!_activeConnections.TryAdd(connectionId, connection)) - { - throw new InvalidOperationException("Connection already added."); - } - } - - public void ConnectionStopped(long connectionId) - { - Connection removed; - _activeConnections.TryRemove(connectionId, out removed); - } - - public Task CloseConnectionsAsync() - { - if (_managerClosed) - { - throw new InvalidOperationException(nameof(ConnectionManager) + " already closed."); - } - - _managerClosed = true; - - var stopTasks = new List(); - - foreach (var connectionId in _activeConnections.Keys) - { - Connection removed; - if (_activeConnections.TryRemove(connectionId, out removed)) - { - stopTasks.Add(removed.StopAsync()); - } - } - - return Task.WhenAll(stopTasks); - } - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs index e6a40d024a..1cbeef9fd9 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -7,7 +7,6 @@ using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Server.Kestrel.Http; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Networking; using Microsoft.Extensions.Logging; @@ -72,6 +71,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel return tcs.Task; } + // This must be called from the libuv event loop. + public void AllowStop() + { + _post.Unreference(); + } + public void Stop(TimeSpan timeout) { if (!_initCompleted) @@ -79,19 +84,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel return; } - var stepTimeout = (int)(timeout.TotalMilliseconds / 2); - - Post(t => t.OnStop()); - if (!_thread.Join(stepTimeout)) + if (_thread.IsAlive) { + var stepTimeout = (int)(timeout.TotalMilliseconds / 2); try { - Post(t => t.OnStopImmediate()); + Post(t => t.OnStopRude()); if (!_thread.Join(stepTimeout)) { + Post(t => t.OnStopImmediate()); + if (!_thread.Join(stepTimeout)) + { #if NET451 - _thread.Abort(); + _thread.Abort(); #endif + } } } catch (ObjectDisposedException) @@ -113,23 +120,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel } } - private void OnStop() + private void OnStopRude() { - // If the listeners were all disposed gracefully there should be no handles - // left to dispose other than _post. - // We dispose everything here in the event they are not closed gracefully. - _engine.Libuv.walk( - _loop, - (ptr, arg) => + Walk(ptr => + { + var handle = UvMemory.FromIntPtr(ptr); + if (handle != _post) { - var handle = UvMemory.FromIntPtr(ptr); - if (handle != _post) - { - handle.Dispose(); - } - }, - IntPtr.Zero); + handle.Dispose(); + } + }); + // uv_unref is idempotent so it's OK to call this here and in AllowStop. _post.Unreference(); } @@ -175,6 +177,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel return tcs.Task; } + public void Walk(Action callback) + { + _engine.Libuv.walk( + _loop, + (ptr, arg) => + { + callback(ptr); + }, + IntPtr.Zero); + } + private void PostCloseHandle(Action callback, IntPtr handle) { EnqueueCloseHandle(callback, handle); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPool2.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPool2.cs index 5aab1ff902..9eb0ebfe60 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPool2.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPool2.cs @@ -138,7 +138,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure /// The block to return. It must have been acquired by calling Lease on the same memory pool instance. public void Return(MemoryPoolBlock2 block) { - Debug.Assert(block.Pool == this, "Returned block was leased from this pool"); + Debug.Assert(block.Pool == this, "Returned block was not leased from this pool"); if (block.Slab != null && block.Slab.IsActive) { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs index 2a4a732a96..0dead6c6ae 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs @@ -77,7 +77,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure ~MemoryPoolBlock2() { Debug.Assert(!_pinHandle.IsAllocated, "Ad-hoc memory block wasn't unpinned"); - Debug.Assert(Slab == null, "Block being garbage collected instead of returned to pool"); + Debug.Assert(Slab == null || !Slab.IsActive, "Block being garbage collected instead of returned to pool"); if (_pinHandle.IsAllocated) { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Networking/UvStreamHandle.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Networking/UvStreamHandle.cs index 266fd96a13..eb94d78ef0 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Networking/UvStreamHandle.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Networking/UvStreamHandle.cs @@ -3,6 +3,7 @@ using System; using System.Runtime.InteropServices; +using Microsoft.AspNetCore.Server.Kestrel.Http; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.Extensions.Logging; @@ -28,6 +29,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking { } + public Connection Connection { get; set; } + protected override bool ReleaseHandle() { if (_listenVitality.IsAllocated)