Close connections as quickly and gracefully as possible on shutdown

- Make the time given for requests to complete gracefully configurable.
- Complete all async reads so calling code can re-check whether to stop
  request processing and exit if in between requests.
- Don't wait for a FIN from the client since some browsers (e.g. IE & Chrome)
  will take a long time to send one.
- Ensure all ConnectionFilters complete before the memory pool is disposed.
- Ensure blocks get returned even when a ConnectionFilter produces a failed read
This commit is contained in:
Stephen Halter 2016-02-09 12:30:19 -08:00
parent 9c31907bac
commit 54caf3071c
21 changed files with 364 additions and 165 deletions

View File

@ -15,6 +15,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
private readonly Stream _filteredStream; private readonly Stream _filteredStream;
private readonly Stream _socketInputStream; private readonly Stream _socketInputStream;
private readonly IKestrelTrace _log; private readonly IKestrelTrace _log;
private readonly MemoryPool2 _memory;
private MemoryPoolBlock2 _block;
public FilteredStreamAdapter( public FilteredStreamAdapter(
Stream filteredStream, Stream filteredStream,
@ -28,24 +30,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
_log = logger; _log = logger;
_filteredStream = filteredStream; _filteredStream = filteredStream;
_socketInputStream = new SocketInputStream(SocketInput); _socketInputStream = new SocketInputStream(SocketInput);
_memory = memory;
var block = memory.Lease();
// Use pooled block for copy
_filteredStream.CopyToAsync(_socketInputStream, block).ContinueWith((task, state) =>
{
var returnedBlock = task.Result;
returnedBlock.Pool.Return(returnedBlock);
((FilteredStreamAdapter)state).OnStreamClose(task);
}, this);
} }
public SocketInput SocketInput { get; private set; } public SocketInput SocketInput { get; private set; }
public ISocketOutput SocketOutput { get; private set; } public ISocketOutput SocketOutput { get; private set; }
public Task ReadInputAsync()
{
_block = _memory.Lease();
// Use pooled block for copy
return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
{
((FilteredStreamAdapter)state).OnStreamClose(task);
}, this);
}
private void OnStreamClose(Task copyAsyncTask) private void OnStreamClose(Task copyAsyncTask)
{ {
_memory.Return(_block);
if (copyAsyncTask.IsFaulted) if (copyAsyncTask.IsFaulted)
{ {
SocketInput.AbortAwaiting(); SocketInput.AbortAwaiting();

View File

@ -9,15 +9,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
{ {
public static class StreamExtensions public static class StreamExtensions
{ {
public static async Task<MemoryPoolBlock2> CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block) public static async Task CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block)
{ {
int bytesRead; int bytesRead;
while ((bytesRead = await source.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0) while ((bytesRead = await source.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0)
{ {
await destination.WriteAsync(block.Array, block.Data.Offset, bytesRead); await destination.WriteAsync(block.Array, block.Data.Offset, bytesRead);
} }
return block;
} }
} }
} }

View File

@ -4,6 +4,7 @@
using System; using System;
using System.Net; using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Filter; using Microsoft.AspNetCore.Server.Kestrel.Filter;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Networking; using Microsoft.AspNetCore.Server.Kestrel.Networking;
@ -31,6 +32,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private readonly object _stateLock = new object(); private readonly object _stateLock = new object();
private ConnectionState _connectionState; private ConnectionState _connectionState;
private TaskCompletionSource<object> _socketClosedTcs;
private Task _readFilteredInputTask = TaskUtilities.CompletedTask;
private IPEndPoint _remoteEndPoint; private IPEndPoint _remoteEndPoint;
private IPEndPoint _localEndPoint; private IPEndPoint _localEndPoint;
@ -44,6 +47,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
_rawSocketInput = new SocketInput(Memory2, ThreadPool); _rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
ConnectionManager.AddConnection(_connectionId, this);
}
// Internal for testing
internal Connection()
{
} }
public void Start() public void Start()
@ -63,11 +73,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// Don't initialize _frame until SocketInput and SocketOutput are set to their final values. // Don't initialize _frame until SocketInput and SocketOutput are set to their final values.
if (ServerInformation.ConnectionFilter == null) if (ServerInformation.ConnectionFilter == null)
{ {
SocketInput = _rawSocketInput; lock (_stateLock)
SocketOutput = _rawSocketOutput; {
if (_connectionState != ConnectionState.CreatingFrame)
{
throw new InvalidOperationException("Invalid connection state: " + _connectionState);
}
_frame = CreateFrame(); _connectionState = ConnectionState.Open;
_frame.Start();
SocketInput = _rawSocketInput;
SocketOutput = _rawSocketOutput;
_frame = CreateFrame();
_frame.Start();
}
} }
else else
{ {
@ -109,21 +129,51 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
} }
} }
public virtual void Abort() public Task StopAsync()
{ {
if (_frame != null) lock (_stateLock)
{ {
// Frame.Abort calls user code while this method is always switch (_connectionState)
// called from a libuv thread.
System.Threading.ThreadPool.QueueUserWorkItem(state =>
{ {
var connection = (Connection)state; case ConnectionState.SocketClosed:
connection._frame.Abort(); return _readFilteredInputTask;
}, this); case ConnectionState.CreatingFrame:
_connectionState = ConnectionState.ToDisconnect;
break;
case ConnectionState.Open:
_frame.Stop();
SocketInput.CompleteAwaiting();
break;
}
_socketClosedTcs = new TaskCompletionSource<object>();
return Task.WhenAll(_socketClosedTcs.Task, _readFilteredInputTask);
} }
} }
public void OnSocketClosed() public virtual void Abort()
{
lock (_stateLock)
{
if (_connectionState == ConnectionState.CreatingFrame)
{
_connectionState = ConnectionState.ToDisconnect;
}
else
{
// Frame.Abort calls user code while this method is always
// called from a libuv thread.
System.Threading.ThreadPool.QueueUserWorkItem(state =>
{
var connection = (Connection)state;
connection._frame.Abort();
}, this);
}
}
}
// Called on Libuv thread
public virtual void OnSocketClosed()
{ {
_rawSocketInput.Dispose(); _rawSocketInput.Dispose();
@ -133,25 +183,64 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
SocketInput.Dispose(); SocketInput.Dispose();
} }
lock (_stateLock)
{
_connectionState = ConnectionState.SocketClosed;
if (_socketClosedTcs != null)
{
// This is always waited on synchronously, so it's safe to
// call on the libuv thread.
_socketClosedTcs.TrySetResult(null);
}
if (_readFilteredInputTask.IsCompleted)
{
ConnectionManager.ConnectionStopped(_connectionId);
}
else
{
_readFilteredInputTask.ContinueWith((t, state) =>
{
var connection = (Connection)state;
connection.ConnectionManager.ConnectionStopped(connection._connectionId);
}, this);
}
}
} }
private void ApplyConnectionFilter() private void ApplyConnectionFilter()
{ {
if (_filterContext.Connection != _libuvStream) lock (_stateLock)
{ {
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool); if (_connectionState == ConnectionState.CreatingFrame)
{
_connectionState = ConnectionState.Open;
SocketInput = filteredStreamAdapter.SocketInput; if (_filterContext.Connection != _libuvStream)
SocketOutput = filteredStreamAdapter.SocketOutput; {
} var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool);
else
{
SocketInput = _rawSocketInput;
SocketOutput = _rawSocketOutput;
}
_frame = CreateFrame(); SocketInput = filteredStreamAdapter.SocketInput;
_frame.Start(); SocketOutput = filteredStreamAdapter.SocketOutput;
_readFilteredInputTask = filteredStreamAdapter.ReadInputAsync();
}
else
{
SocketInput = _rawSocketInput;
SocketOutput = _rawSocketOutput;
}
_frame = CreateFrame();
_frame.Start();
}
else
{
ConnectionControl.End(ProduceEndType.SocketDisconnect);
}
}
} }
private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state) private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
@ -227,16 +316,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
switch (endType) switch (endType)
{ {
case ProduceEndType.SocketShutdownSend:
if (_connectionState != ConnectionState.Open)
{
return;
}
_connectionState = ConnectionState.Shutdown;
Log.ConnectionWriteFin(_connectionId);
_rawSocketOutput.End(endType);
break;
case ProduceEndType.ConnectionKeepAlive: case ProduceEndType.ConnectionKeepAlive:
if (_connectionState != ConnectionState.Open) if (_connectionState != ConnectionState.Open)
{ {
@ -245,12 +324,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
Log.ConnectionKeepAlive(_connectionId); Log.ConnectionKeepAlive(_connectionId);
break; break;
case ProduceEndType.SocketShutdown:
case ProduceEndType.SocketDisconnect: case ProduceEndType.SocketDisconnect:
if (_connectionState == ConnectionState.Disconnected) if (_connectionState == ConnectionState.Disconnecting ||
_connectionState == ConnectionState.SocketClosed)
{ {
return; return;
} }
_connectionState = ConnectionState.Disconnected; _connectionState = ConnectionState.Disconnecting;
Log.ConnectionDisconnect(_connectionId); Log.ConnectionDisconnect(_connectionId);
_rawSocketOutput.End(endType); _rawSocketOutput.End(endType);
@ -261,9 +342,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private enum ConnectionState private enum ConnectionState
{ {
CreatingFrame,
ToDisconnect,
Open, Open,
Shutdown, Disconnecting,
Disconnected SocketClosed
} }
} }
} }

View File

@ -145,20 +145,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// If _requestAborted is set, the connection has already been closed. // If _requestAborted is set, the connection has already been closed.
if (Volatile.Read(ref _requestAborted) == 0) if (Volatile.Read(ref _requestAborted) == 0)
{ {
try ConnectionControl.End(ProduceEndType.SocketShutdown);
{
// Inform client no more data will ever arrive
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
// Wait for client to either disconnect or send unexpected data
await SocketInput;
}
finally
{
// Ensure we *always* disconnect the socket.
// Dispose socket
ConnectionControl.End(ProduceEndType.SocketDisconnect);
}
} }
} }
catch (Exception ex) catch (Exception ex)

View File

@ -0,0 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
interface IAsyncDisposable
{
Task DisposeAsync();
}
}

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// <summary> /// <summary>
/// Base class for listeners in Kestrel. Listens for incoming connections /// Base class for listeners in Kestrel. Listens for incoming connections
/// </summary> /// </summary>
public abstract class Listener : ListenerContext, IDisposable public abstract class Listener : ListenerContext, IAsyncDisposable
{ {
protected Listener(ServiceContext serviceContext) protected Listener(ServiceContext serviceContext)
: base(serviceContext) : base(serviceContext)
@ -77,7 +77,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
connection.Start(); connection.Start();
} }
public void Dispose() public virtual async Task DisposeAsync()
{ {
// Ensure the event loop is still running. // Ensure the event loop is still running.
// If the event loop isn't running and we try to wait on this Post // If the event loop isn't running and we try to wait on this Post
@ -85,34 +85,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// the exception that stopped the event loop will never be surfaced. // the exception that stopped the event loop will never be surfaced.
if (Thread.FatalError == null && ListenSocket != null) if (Thread.FatalError == null && ListenSocket != null)
{ {
var tcs = new TaskCompletionSource<int>(this); await Thread.PostAsync(state =>
Thread.Post( {
tcs2 => var listener = (Listener)state;
listener.ListenSocket.Dispose();
}, this);
await ConnectionManager.CloseConnectionsAsync();
await Thread.PostAsync(state =>
{
var listener = (Listener)state;
var writeReqPool = listener.WriteReqPool;
while (writeReqPool.Count > 0)
{ {
try writeReqPool.Dequeue().Dispose();
{ }
var socket = (Listener)tcs2.Task.AsyncState; }, this);
socket.ListenSocket.Dispose();
var writeReqPool = socket.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
tcs2.SetResult(0);
}
catch (Exception ex)
{
tcs2.SetException(ex);
}
},
tcs);
// REVIEW: Should we add a timeout here to be safe?
tcs.Task.Wait();
} }
Memory2.Dispose();
ListenSocket = null; ListenSocket = null;
} }
} }

View File

@ -11,13 +11,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
public ListenerContext() public ListenerContext()
{ {
Memory2 = new MemoryPool2();
} }
public ListenerContext(ServiceContext serviceContext) public ListenerContext(ServiceContext serviceContext)
: base(serviceContext) : base(serviceContext)
{ {
Memory2 = new MemoryPool2(); Memory2 = new MemoryPool2();
ConnectionManager = new ConnectionManager();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs); WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
} }
@ -27,6 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
ServerAddress = listenerContext.ServerAddress; ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread; Thread = listenerContext.Thread;
Memory2 = listenerContext.Memory2; Memory2 = listenerContext.Memory2;
ConnectionManager = listenerContext.ConnectionManager;
WriteReqPool = listenerContext.WriteReqPool; WriteReqPool = listenerContext.WriteReqPool;
Log = listenerContext.Log; Log = listenerContext.Log;
} }
@ -37,6 +38,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public MemoryPool2 Memory2 { get; set; } public MemoryPool2 Memory2 { get; set; }
public ConnectionManager ConnectionManager { get; set; }
public Queue<UvWriteReq> WriteReqPool { get; set; } public Queue<UvWriteReq> WriteReqPool { get; set; }
} }
} }

View File

@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
await StartAsync(address, thread).ConfigureAwait(false); await StartAsync(address, thread).ConfigureAwait(false);
await Thread.PostAsync(_this => _this.PostCallback(), await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(),
this).ConfigureAwait(false); this).ConfigureAwait(false);
} }
private void PostCallback() private void PostCallback()
@ -100,5 +100,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
socket); socket);
} }
} }
public override async Task DisposeAsync()
{
// Call base first so the ListenSocket gets closed and doesn't
// try to dispatch connections to closed pipes.
await base.DisposeAsync();
if (Thread.FatalError == null && ListenPipe != null)
{
await Thread.PostAsync(state =>
{
var listener = (ListenerPrimary)state;
listener.ListenPipe.Dispose();
foreach (var dispatchPipe in listener._dispatchPipes)
{
dispatchPipe.Dispose();
}
}, this);
}
}
} }
} }

View File

@ -5,7 +5,6 @@ using System;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Networking; using Microsoft.AspNetCore.Server.Kestrel.Networking;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -16,7 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// A secondary listener is delegated requests from a primary listener via a named pipe or /// A secondary listener is delegated requests from a primary listener via a named pipe or
/// UNIX domain socket. /// UNIX domain socket.
/// </summary> /// </summary>
public abstract class ListenerSecondary : ListenerContext, IDisposable public abstract class ListenerSecondary : ListenerContext, IAsyncDisposable
{ {
private string _pipeName; private string _pipeName;
private IntPtr _ptr; private IntPtr _ptr;
@ -155,7 +154,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
} }
} }
public void Dispose() public async Task DisposeAsync()
{ {
// Ensure the event loop is still running. // Ensure the event loop is still running.
// If the event loop isn't running and we try to wait on this Post // If the event loop isn't running and we try to wait on this Post
@ -163,16 +162,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// the exception that stopped the event loop will never be surfaced. // the exception that stopped the event loop will never be surfaced.
if (Thread.FatalError == null) if (Thread.FatalError == null)
{ {
Thread.Send(listener => await Thread.PostAsync(state =>
{ {
var listener = (ListenerSecondary)state;
listener.DispatchPipe.Dispose(); listener.DispatchPipe.Dispose();
listener.FreeBuffer(); listener.FreeBuffer();
}, this); }, this);
await ConnectionManager.CloseConnectionsAsync();
await Thread.PostAsync(state =>
{
var listener = (ListenerSecondary)state;
var writeReqPool = listener.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
}, this);
} }
else else
{ {
FreeBuffer(); FreeBuffer();
} }
Memory2.Dispose();
} }
} }
} }

View File

@ -5,7 +5,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
public enum ProduceEndType public enum ProduceEndType
{ {
SocketShutdownSend, SocketShutdown,
SocketDisconnect, SocketDisconnect,
ConnectionKeepAlive, ConnectionKeepAlive,
} }

View File

@ -183,6 +183,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
} }
} }
public void CompleteAwaiting()
{
Complete();
}
public void AbortAwaiting() public void AbortAwaiting()
{ {
_awaitableError = new TaskCanceledException("The request was aborted"); _awaitableError = new TaskCanceledException("The request was aborted");

View File

@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private const int _initialTaskQueues = 64; private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32; private const int _maxPooledWriteContexts = 32;
private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
private static readonly Action<object> _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered(); private static readonly Action<object> _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered();
private readonly KestrelThread _thread; private readonly KestrelThread _thread;
@ -205,17 +204,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
switch (endType) switch (endType)
{ {
case ProduceEndType.SocketShutdownSend: case ProduceEndType.SocketShutdown:
WriteAsync(default(ArraySegment<byte>), WriteAsync(default(ArraySegment<byte>),
default(CancellationToken), default(CancellationToken),
socketShutdownSend: true, socketShutdownSend: true,
socketDisconnect: false); socketDisconnect: true,
isSync: true);
break; break;
case ProduceEndType.SocketDisconnect: case ProduceEndType.SocketDisconnect:
WriteAsync(default(ArraySegment<byte>), WriteAsync(default(ArraySegment<byte>),
default(CancellationToken), default(CancellationToken),
socketShutdownSend: false, socketShutdownSend: false,
socketDisconnect: true); socketDisconnect: true,
isSync: true);
break; break;
} }
} }
@ -256,6 +257,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
MemoryPoolBlock2 blockToReturn = null; MemoryPoolBlock2 blockToReturn = null;
lock (_returnLock) lock (_returnLock)
{ {
Debug.Assert(!_lastStart.IsDefault); Debug.Assert(!_lastStart.IsDefault);
@ -277,7 +279,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
if (blockToReturn != null) if (blockToReturn != null)
{ {
ThreadPool.QueueUserWorkItem(_returnBlocks, blockToReturn); ReturnBlocks(blockToReturn);
} }
} }
@ -593,6 +595,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
return; return;
} }
Self._log.ConnectionWriteFin(Self._connectionId);
var shutdownReq = new UvShutdownReq(Self._log); var shutdownReq = new UvShutdownReq(Self._log);
shutdownReq.Init(Self._thread.Loop); shutdownReq.Init(Self._thread.Loop);
shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) => shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) =>
@ -618,9 +622,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
return; return;
} }
// Ensure all blocks are returned before calling OnSocketClosed
// to ensure the MemoryPool doesn't get disposed too soon.
Self.ReturnAllBlocks();
Self._socket.Dispose(); Self._socket.Dispose();
Self._connection.OnSocketClosed(); Self._connection.OnSocketClosed();
Self.ReturnAllBlocks();
Self._log.ConnectionStop(Self._connectionId); Self._log.ConnectionStop(Self._connectionId);
CompleteWithContextLock(); CompleteWithContextLock();
} }

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Server.Kestrel.Filter; using Microsoft.AspNetCore.Server.Kestrel.Filter;
namespace Microsoft.AspNetCore.Server.Kestrel namespace Microsoft.AspNetCore.Server.Kestrel
@ -9,6 +10,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel
{ {
int ThreadCount { get; set; } int ThreadCount { get; set; }
/// <summary>
/// The amount of time after the server begins shutting down before connections will be forcefully closed.
/// By default, Kestrel will wait 5 seconds for any ongoing requests to complete before terminating
/// the connection.
/// A custom timeout can be configured using the "kestrel.shutdownTimeout" key in <seealso cref="Microsoft.Extensions.Configuration.IConfiguration"/>.
/// The value will be parsed as a float representing the timout in seconds.
/// </summary>
TimeSpan ShutdownTimeout { get; set; }
bool NoDelay { get; set; } bool NoDelay { get; set; }
/// <summary> /// <summary>

View File

@ -0,0 +1,59 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Http;
namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
{
public class ConnectionManager
{
private bool _managerClosed;
private ConcurrentDictionary<long, Connection> _activeConnections = new ConcurrentDictionary<long, Connection>();
public void AddConnection(long connectionId, Connection connection)
{
if (_managerClosed)
{
throw new InvalidOperationException(nameof(ConnectionManager) + " closed.");
}
if (!_activeConnections.TryAdd(connectionId, connection))
{
throw new InvalidOperationException("Connection already added.");
}
}
public void ConnectionStopped(long connectionId)
{
Connection removed;
_activeConnections.TryRemove(connectionId, out removed);
}
public Task CloseConnectionsAsync()
{
if (_managerClosed)
{
throw new InvalidOperationException(nameof(ConnectionManager) + " already closed.");
}
_managerClosed = true;
var stopTasks = new List<Task>();
foreach (var connectionId in _activeConnections.Keys)
{
Connection removed;
if (_activeConnections.TryRemove(connectionId, out removed))
{
stopTasks.Add(removed.StopAsync());
}
}
return Task.WhenAll(stopTasks);
}
}
}

View File

@ -33,6 +33,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
void ConnectionDisconnectedWrite(long connectionId, int count, Exception ex); void ConnectionDisconnectedWrite(long connectionId, int count, Exception ex);
void NotAllConnectionsClosedGracefully();
void ApplicationError(Exception ex); void ApplicationError(Exception ex);
} }
} }

View File

@ -27,8 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
private static readonly Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state); private static readonly Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
private static readonly Action<object, object> _socketCallbackAdapter = (callback, state) => ((Action<SocketOutput>)callback).Invoke((SocketOutput)state); private static readonly Action<object, object> _socketCallbackAdapter = (callback, state) => ((Action<SocketOutput>)callback).Invoke((SocketOutput)state);
private static readonly Action<object, object> _tcsCallbackAdapter = (callback, state) => ((Action<TaskCompletionSource<int>>)callback).Invoke((TaskCompletionSource<int>)state); private static readonly Action<object, object> _tcsCallbackAdapter = (callback, state) => ((Action<TaskCompletionSource<int>>)callback).Invoke((TaskCompletionSource<int>)state);
private static readonly Action<object, object> _listenerPrimaryCallbackAdapter = (callback, state) => ((Action<ListenerPrimary>)callback).Invoke((ListenerPrimary)state); private static readonly Action<object, object> _postAsyncCallbackAdapter = (callback, state) => ((Action<object>)callback).Invoke(state);
private static readonly Action<object, object> _listenerSecondaryCallbackAdapter = (callback, state) => ((Action<ListenerSecondary>)callback).Invoke((ListenerSecondary)state);
private readonly KestrelEngine _engine; private readonly KestrelEngine _engine;
private readonly IApplicationLifetime _appLifetime; private readonly IApplicationLifetime _appLifetime;
@ -78,23 +77,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return; return;
} }
var stepTimeout = (int)(timeout.TotalMilliseconds / 3); var stepTimeout = (int)(timeout.TotalMilliseconds / 2);
Post(t => t.OnStop()); Post(t => t.OnStop());
if (!_thread.Join(stepTimeout)) if (!_thread.Join(stepTimeout))
{ {
try try
{ {
Post(t => t.OnStopRude()); Post(t => t.OnStopImmediate());
if (!_thread.Join(stepTimeout)) if (!_thread.Join(stepTimeout))
{ {
Post(t => t.OnStopImmediate());
if (!_thread.Join(stepTimeout))
{
#if NET451 #if NET451
_thread.Abort(); _thread.Abort();
#endif #endif
}
} }
} }
catch (ObjectDisposedException) catch (ObjectDisposedException)
@ -118,11 +113,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel
private void OnStop() private void OnStop()
{ {
_post.Unreference(); // If the listeners were all disposed gracefully there should be no handles
} // left to dispose other than _post.
// We dispose everything here in the event they are not closed gracefully.
private void OnStopRude()
{
_engine.Libuv.walk( _engine.Libuv.walk(
_loop, _loop,
(ptr, arg) => (ptr, arg) =>
@ -134,6 +127,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel
} }
}, },
IntPtr.Zero); IntPtr.Zero);
_post.Unreference();
} }
private void OnStopImmediate() private void OnStopImmediate()
@ -179,14 +174,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_post.Send(); _post.Send();
} }
public Task PostAsync(Action<ListenerPrimary> callback, ListenerPrimary state) public Task PostAsync(Action<object> callback, object state)
{ {
var tcs = new TaskCompletionSource<object>(); var tcs = new TaskCompletionSource<object>();
lock (_workSync) lock (_workSync)
{ {
_workAdding.Enqueue(new Work _workAdding.Enqueue(new Work
{ {
CallbackAdapter = _listenerPrimaryCallbackAdapter, CallbackAdapter = _postAsyncCallbackAdapter,
Callback = callback, Callback = callback,
State = state, State = state,
Completion = tcs Completion = tcs
@ -196,35 +191,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return tcs.Task; return tcs.Task;
} }
public Task PostAsync(Action<ListenerSecondary> callback, ListenerSecondary state)
{
var tcs = new TaskCompletionSource<object>();
lock (_workSync)
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _listenerSecondaryCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
}
_post.Send();
return tcs.Task;
}
public void Send(Action<ListenerSecondary> callback, ListenerSecondary state)
{
if (_loop.ThreadId == Thread.CurrentThread.ManagedThreadId)
{
callback.Invoke(state);
}
else
{
PostAsync(callback, state).Wait();
}
}
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle) private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{ {
lock (_workSync) lock (_workSync)

View File

@ -23,6 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
private static readonly Action<ILogger, long, Exception> _connectionDisconnect; private static readonly Action<ILogger, long, Exception> _connectionDisconnect;
private static readonly Action<ILogger, long, Exception> _connectionError; private static readonly Action<ILogger, long, Exception> _connectionError;
private static readonly Action<ILogger, long, int, Exception> _connectionDisconnectedWrite; private static readonly Action<ILogger, long, int, Exception> _connectionDisconnectedWrite;
private static readonly Action<ILogger, Exception> _notAllConnectionsClosedGracefully;
protected readonly ILogger _logger; protected readonly ILogger _logger;
@ -37,12 +38,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_connectionWriteFin = LoggerMessage.Define<long>(LogLevel.Debug, 7, @"Connection id ""{ConnectionId}"" sending FIN."); _connectionWriteFin = LoggerMessage.Define<long>(LogLevel.Debug, 7, @"Connection id ""{ConnectionId}"" sending FIN.");
_connectionWroteFin = LoggerMessage.Define<long, int>(LogLevel.Debug, 8, @"Connection id ""{ConnectionId}"" sent FIN with status ""{Status}""."); _connectionWroteFin = LoggerMessage.Define<long, int>(LogLevel.Debug, 8, @"Connection id ""{ConnectionId}"" sent FIN with status ""{Status}"".");
_connectionKeepAlive = LoggerMessage.Define<long>(LogLevel.Debug, 9, @"Connection id ""{ConnectionId}"" completed keep alive response."); _connectionKeepAlive = LoggerMessage.Define<long>(LogLevel.Debug, 9, @"Connection id ""{ConnectionId}"" completed keep alive response.");
_connectionDisconnect = LoggerMessage.Define<long>(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnected."); _connectionDisconnect = LoggerMessage.Define<long>(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnecting.");
// ConnectionWrite: Reserved: 11 // ConnectionWrite: Reserved: 11
// ConnectionWriteCallback: Reserved: 12 // ConnectionWriteCallback: Reserved: 12
// ApplicationError: Reserved: 13 - LoggerMessage.Define overload not present // ApplicationError: Reserved: 13 - LoggerMessage.Define overload not present
_connectionError = LoggerMessage.Define<long>(LogLevel.Information, 14, @"Connection id ""{ConnectionId}"" communication error"); _connectionError = LoggerMessage.Define<long>(LogLevel.Information, 14, @"Connection id ""{ConnectionId}"" communication error");
_connectionDisconnectedWrite = LoggerMessage.Define<long, int>(LogLevel.Debug, 15, @"Connection id ""{ConnectionId}"" write of ""{count}"" bytes to disconnected client."); _connectionDisconnectedWrite = LoggerMessage.Define<long, int>(LogLevel.Debug, 15, @"Connection id ""{ConnectionId}"" write of ""{count}"" bytes to disconnected client.");
_notAllConnectionsClosedGracefully = LoggerMessage.Define(LogLevel.Debug, 16, "Some connections failed to close gracefully during server shutdown.");
} }
public KestrelTrace(ILogger logger) public KestrelTrace(ILogger logger)
@ -128,6 +130,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_connectionDisconnectedWrite(_logger, connectionId, count, ex); _connectionDisconnectedWrite(_logger, connectionId, count, ex);
} }
public virtual void NotAllConnectionsClosedGracefully()
{
_notAllConnectionsClosedGracefully(_logger, null);
}
public virtual void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter) public virtual void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{ {
_logger.Log(logLevel, eventId, state, exception, formatter); _logger.Log(logLevel, eventId, state, exception, formatter);

View File

@ -3,7 +3,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using Microsoft.AspNetCore.Http; using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Http; using Microsoft.AspNetCore.Server.Kestrel.Http;
using Microsoft.AspNetCore.Server.Kestrel.Networking; using Microsoft.AspNetCore.Server.Kestrel.Networking;
@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
public IDisposable CreateServer(ServerAddress address) public IDisposable CreateServer(ServerAddress address)
{ {
var listeners = new List<IDisposable>(); var listeners = new List<IAsyncDisposable>();
var usingPipes = address.IsUnixPipe; var usingPipes = address.IsUnixPipe;
@ -91,23 +91,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel
first = false; first = false;
} }
return new Disposable(() => return new Disposable(() =>
{ {
foreach (var listener in listeners) DisposeListeners(listeners);
{
listener.Dispose();
}
}); });
} }
catch catch
{ {
foreach (var listener in listeners) DisposeListeners(listeners);
{
listener.Dispose();
}
throw; throw;
} }
} }
private void DisposeListeners(List<IAsyncDisposable> listeners)
{
var disposeTasks = new List<Task>();
foreach (var listener in listeners)
{
disposeTasks.Add(listener.DisposeAsync());
}
if (!Task.WhenAll(disposeTasks).Wait(ServerInformation.ShutdownTimeout))
{
Log.NotAllConnectionsClosedGracefully();
}
}
} }
} }

View File

@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
var trace = new KestrelTrace(_logger); var trace = new KestrelTrace(_logger);
var engine = new KestrelEngine(new ServiceContext var engine = new KestrelEngine(new ServiceContext
{ {
FrameFactory = (context, remoteEP, localEP, prepareRequest) => FrameFactory = (context, remoteEP, localEP, prepareRequest) =>
{ {
return new Frame<TContext>(application, context, remoteEP, localEP, prepareRequest); return new Frame<TContext>(application, context, remoteEP, localEP, prepareRequest);
}, },

View File

@ -21,6 +21,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
Addresses = GetAddresses(configuration); Addresses = GetAddresses(configuration);
ThreadCount = GetThreadCount(configuration); ThreadCount = GetThreadCount(configuration);
ShutdownTimeout = GetShutdownTimeout(configuration);
NoDelay = GetNoDelay(configuration); NoDelay = GetNoDelay(configuration);
PoolingParameters = new KestrelServerPoolingParameters(configuration); PoolingParameters = new KestrelServerPoolingParameters(configuration);
} }
@ -29,6 +30,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel
public int ThreadCount { get; set; } public int ThreadCount { get; set; }
public TimeSpan ShutdownTimeout { get; set; }
public bool NoDelay { get; set; } public bool NoDelay { get; set; }
public KestrelServerPoolingParameters PoolingParameters { get; } public KestrelServerPoolingParameters PoolingParameters { get; }
@ -93,6 +96,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return ProcessorThreadCount; return ProcessorThreadCount;
} }
private TimeSpan GetShutdownTimeout(IConfiguration configuration)
{
var shutdownTimeoutString = configuration["kestrel.shutdownTimout"];
float shutdownTimeout;
if (float.TryParse(shutdownTimeoutString, NumberStyles.Float, CultureInfo.InvariantCulture, out shutdownTimeout))
{
return TimeSpan.FromSeconds(shutdownTimeout);
}
return TimeSpan.FromSeconds(5);
}
private static bool GetNoDelay(IConfiguration configuration) private static bool GetNoDelay(IConfiguration configuration)
{ {
var noDelayString = configuration["kestrel.noDelay"]; var noDelayString = configuration["kestrel.noDelay"];

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel; using Microsoft.AspNetCore.Server.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Filter; using Microsoft.AspNetCore.Server.Kestrel.Filter;
@ -23,11 +24,13 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var configuration = new ConfigurationBuilder().Build(); var configuration = new ConfigurationBuilder().Build();
ServerInformation = new KestrelServerInformation(configuration); ServerInformation = new KestrelServerInformation(configuration);
ServerInformation.ShutdownTimeout = TimeSpan.FromSeconds(5);
HttpComponentFactory = new HttpComponentFactory(ServerInformation); HttpComponentFactory = new HttpComponentFactory(ServerInformation);
} }
public TestServiceContext(IConnectionFilter filter) public TestServiceContext(IConnectionFilter filter)
: base() : this()
{ {
ServerInformation.ConnectionFilter = filter; ServerInformation.ConnectionFilter = filter;
} }