Close connections as quickly and gracefully as possible on shutdown
- Make the time given for requests to complete gracefully configurable. - Complete all async reads so calling code can re-check whether to stop request processing and exit if in between requests. - Don't wait for a FIN from the client since some browsers (e.g. IE & Chrome) will take a long time to send one. - Ensure all ConnectionFilters complete before the memory pool is disposed. - Ensure blocks get returned even when a ConnectionFilter produces a failed read
This commit is contained in:
parent
9c31907bac
commit
54caf3071c
|
|
@ -15,6 +15,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
private readonly Stream _filteredStream;
|
||||
private readonly Stream _socketInputStream;
|
||||
private readonly IKestrelTrace _log;
|
||||
private readonly MemoryPool2 _memory;
|
||||
private MemoryPoolBlock2 _block;
|
||||
|
||||
public FilteredStreamAdapter(
|
||||
Stream filteredStream,
|
||||
|
|
@ -28,24 +30,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
_log = logger;
|
||||
_filteredStream = filteredStream;
|
||||
_socketInputStream = new SocketInputStream(SocketInput);
|
||||
|
||||
var block = memory.Lease();
|
||||
// Use pooled block for copy
|
||||
_filteredStream.CopyToAsync(_socketInputStream, block).ContinueWith((task, state) =>
|
||||
{
|
||||
var returnedBlock = task.Result;
|
||||
returnedBlock.Pool.Return(returnedBlock);
|
||||
|
||||
((FilteredStreamAdapter)state).OnStreamClose(task);
|
||||
}, this);
|
||||
_memory = memory;
|
||||
}
|
||||
|
||||
public SocketInput SocketInput { get; private set; }
|
||||
|
||||
public ISocketOutput SocketOutput { get; private set; }
|
||||
|
||||
public Task ReadInputAsync()
|
||||
{
|
||||
_block = _memory.Lease();
|
||||
// Use pooled block for copy
|
||||
return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
|
||||
{
|
||||
((FilteredStreamAdapter)state).OnStreamClose(task);
|
||||
}, this);
|
||||
}
|
||||
|
||||
private void OnStreamClose(Task copyAsyncTask)
|
||||
{
|
||||
_memory.Return(_block);
|
||||
|
||||
if (copyAsyncTask.IsFaulted)
|
||||
{
|
||||
SocketInput.AbortAwaiting();
|
||||
|
|
|
|||
|
|
@ -9,15 +9,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
{
|
||||
public static class StreamExtensions
|
||||
{
|
||||
public static async Task<MemoryPoolBlock2> CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block)
|
||||
public static async Task CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock2 block)
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await source.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0)
|
||||
{
|
||||
await destination.WriteAsync(block.Array, block.Data.Offset, bytesRead);
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Filter;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Networking;
|
||||
|
|
@ -31,6 +32,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
private readonly object _stateLock = new object();
|
||||
private ConnectionState _connectionState;
|
||||
private TaskCompletionSource<object> _socketClosedTcs;
|
||||
private Task _readFilteredInputTask = TaskUtilities.CompletedTask;
|
||||
|
||||
private IPEndPoint _remoteEndPoint;
|
||||
private IPEndPoint _localEndPoint;
|
||||
|
|
@ -44,6 +47,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
|
||||
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
|
||||
|
||||
ConnectionManager.AddConnection(_connectionId, this);
|
||||
}
|
||||
|
||||
// Internal for testing
|
||||
internal Connection()
|
||||
{
|
||||
}
|
||||
|
||||
public void Start()
|
||||
|
|
@ -63,11 +73,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
// Don't initialize _frame until SocketInput and SocketOutput are set to their final values.
|
||||
if (ServerInformation.ConnectionFilter == null)
|
||||
{
|
||||
SocketInput = _rawSocketInput;
|
||||
SocketOutput = _rawSocketOutput;
|
||||
lock (_stateLock)
|
||||
{
|
||||
if (_connectionState != ConnectionState.CreatingFrame)
|
||||
{
|
||||
throw new InvalidOperationException("Invalid connection state: " + _connectionState);
|
||||
}
|
||||
|
||||
_frame = CreateFrame();
|
||||
_frame.Start();
|
||||
_connectionState = ConnectionState.Open;
|
||||
|
||||
SocketInput = _rawSocketInput;
|
||||
SocketOutput = _rawSocketOutput;
|
||||
|
||||
_frame = CreateFrame();
|
||||
_frame.Start();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -109,21 +129,51 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
}
|
||||
}
|
||||
|
||||
public virtual void Abort()
|
||||
public Task StopAsync()
|
||||
{
|
||||
if (_frame != null)
|
||||
lock (_stateLock)
|
||||
{
|
||||
// Frame.Abort calls user code while this method is always
|
||||
// called from a libuv thread.
|
||||
System.Threading.ThreadPool.QueueUserWorkItem(state =>
|
||||
switch (_connectionState)
|
||||
{
|
||||
var connection = (Connection)state;
|
||||
connection._frame.Abort();
|
||||
}, this);
|
||||
case ConnectionState.SocketClosed:
|
||||
return _readFilteredInputTask;
|
||||
case ConnectionState.CreatingFrame:
|
||||
_connectionState = ConnectionState.ToDisconnect;
|
||||
break;
|
||||
case ConnectionState.Open:
|
||||
_frame.Stop();
|
||||
SocketInput.CompleteAwaiting();
|
||||
break;
|
||||
}
|
||||
|
||||
_socketClosedTcs = new TaskCompletionSource<object>();
|
||||
return Task.WhenAll(_socketClosedTcs.Task, _readFilteredInputTask);
|
||||
}
|
||||
}
|
||||
|
||||
public void OnSocketClosed()
|
||||
public virtual void Abort()
|
||||
{
|
||||
lock (_stateLock)
|
||||
{
|
||||
if (_connectionState == ConnectionState.CreatingFrame)
|
||||
{
|
||||
_connectionState = ConnectionState.ToDisconnect;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Frame.Abort calls user code while this method is always
|
||||
// called from a libuv thread.
|
||||
System.Threading.ThreadPool.QueueUserWorkItem(state =>
|
||||
{
|
||||
var connection = (Connection)state;
|
||||
connection._frame.Abort();
|
||||
}, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called on Libuv thread
|
||||
public virtual void OnSocketClosed()
|
||||
{
|
||||
_rawSocketInput.Dispose();
|
||||
|
||||
|
|
@ -133,25 +183,64 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
SocketInput.Dispose();
|
||||
}
|
||||
|
||||
lock (_stateLock)
|
||||
{
|
||||
_connectionState = ConnectionState.SocketClosed;
|
||||
|
||||
if (_socketClosedTcs != null)
|
||||
{
|
||||
// This is always waited on synchronously, so it's safe to
|
||||
// call on the libuv thread.
|
||||
_socketClosedTcs.TrySetResult(null);
|
||||
}
|
||||
|
||||
if (_readFilteredInputTask.IsCompleted)
|
||||
{
|
||||
ConnectionManager.ConnectionStopped(_connectionId);
|
||||
}
|
||||
else
|
||||
{
|
||||
_readFilteredInputTask.ContinueWith((t, state) =>
|
||||
{
|
||||
var connection = (Connection)state;
|
||||
connection.ConnectionManager.ConnectionStopped(connection._connectionId);
|
||||
}, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ApplyConnectionFilter()
|
||||
{
|
||||
if (_filterContext.Connection != _libuvStream)
|
||||
lock (_stateLock)
|
||||
{
|
||||
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool);
|
||||
if (_connectionState == ConnectionState.CreatingFrame)
|
||||
{
|
||||
_connectionState = ConnectionState.Open;
|
||||
|
||||
SocketInput = filteredStreamAdapter.SocketInput;
|
||||
SocketOutput = filteredStreamAdapter.SocketOutput;
|
||||
}
|
||||
else
|
||||
{
|
||||
SocketInput = _rawSocketInput;
|
||||
SocketOutput = _rawSocketOutput;
|
||||
}
|
||||
if (_filterContext.Connection != _libuvStream)
|
||||
{
|
||||
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool);
|
||||
|
||||
_frame = CreateFrame();
|
||||
_frame.Start();
|
||||
SocketInput = filteredStreamAdapter.SocketInput;
|
||||
SocketOutput = filteredStreamAdapter.SocketOutput;
|
||||
|
||||
_readFilteredInputTask = filteredStreamAdapter.ReadInputAsync();
|
||||
}
|
||||
else
|
||||
{
|
||||
SocketInput = _rawSocketInput;
|
||||
SocketOutput = _rawSocketOutput;
|
||||
}
|
||||
|
||||
_frame = CreateFrame();
|
||||
_frame.Start();
|
||||
}
|
||||
else
|
||||
{
|
||||
ConnectionControl.End(ProduceEndType.SocketDisconnect);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
|
||||
|
|
@ -227,16 +316,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
switch (endType)
|
||||
{
|
||||
case ProduceEndType.SocketShutdownSend:
|
||||
if (_connectionState != ConnectionState.Open)
|
||||
{
|
||||
return;
|
||||
}
|
||||
_connectionState = ConnectionState.Shutdown;
|
||||
|
||||
Log.ConnectionWriteFin(_connectionId);
|
||||
_rawSocketOutput.End(endType);
|
||||
break;
|
||||
case ProduceEndType.ConnectionKeepAlive:
|
||||
if (_connectionState != ConnectionState.Open)
|
||||
{
|
||||
|
|
@ -245,12 +324,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
Log.ConnectionKeepAlive(_connectionId);
|
||||
break;
|
||||
case ProduceEndType.SocketShutdown:
|
||||
case ProduceEndType.SocketDisconnect:
|
||||
if (_connectionState == ConnectionState.Disconnected)
|
||||
if (_connectionState == ConnectionState.Disconnecting ||
|
||||
_connectionState == ConnectionState.SocketClosed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
_connectionState = ConnectionState.Disconnected;
|
||||
_connectionState = ConnectionState.Disconnecting;
|
||||
|
||||
Log.ConnectionDisconnect(_connectionId);
|
||||
_rawSocketOutput.End(endType);
|
||||
|
|
@ -261,9 +342,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
private enum ConnectionState
|
||||
{
|
||||
CreatingFrame,
|
||||
ToDisconnect,
|
||||
Open,
|
||||
Shutdown,
|
||||
Disconnected
|
||||
Disconnecting,
|
||||
SocketClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -145,20 +145,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
// If _requestAborted is set, the connection has already been closed.
|
||||
if (Volatile.Read(ref _requestAborted) == 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Inform client no more data will ever arrive
|
||||
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
|
||||
|
||||
// Wait for client to either disconnect or send unexpected data
|
||||
await SocketInput;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Ensure we *always* disconnect the socket.
|
||||
// Dispose socket
|
||||
ConnectionControl.End(ProduceEndType.SocketDisconnect);
|
||||
}
|
||||
ConnectionControl.End(ProduceEndType.SocketShutdown);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
||||
{
|
||||
interface IAsyncDisposable
|
||||
{
|
||||
Task DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
|
@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
/// <summary>
|
||||
/// Base class for listeners in Kestrel. Listens for incoming connections
|
||||
/// </summary>
|
||||
public abstract class Listener : ListenerContext, IDisposable
|
||||
public abstract class Listener : ListenerContext, IAsyncDisposable
|
||||
{
|
||||
protected Listener(ServiceContext serviceContext)
|
||||
: base(serviceContext)
|
||||
|
|
@ -77,7 +77,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
connection.Start();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
public virtual async Task DisposeAsync()
|
||||
{
|
||||
// Ensure the event loop is still running.
|
||||
// If the event loop isn't running and we try to wait on this Post
|
||||
|
|
@ -85,34 +85,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
// the exception that stopped the event loop will never be surfaced.
|
||||
if (Thread.FatalError == null && ListenSocket != null)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<int>(this);
|
||||
Thread.Post(
|
||||
tcs2 =>
|
||||
await Thread.PostAsync(state =>
|
||||
{
|
||||
var listener = (Listener)state;
|
||||
listener.ListenSocket.Dispose();
|
||||
}, this);
|
||||
|
||||
await ConnectionManager.CloseConnectionsAsync();
|
||||
|
||||
await Thread.PostAsync(state =>
|
||||
{
|
||||
var listener = (Listener)state;
|
||||
var writeReqPool = listener.WriteReqPool;
|
||||
while (writeReqPool.Count > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
var socket = (Listener)tcs2.Task.AsyncState;
|
||||
socket.ListenSocket.Dispose();
|
||||
|
||||
var writeReqPool = socket.WriteReqPool;
|
||||
while (writeReqPool.Count > 0)
|
||||
{
|
||||
writeReqPool.Dequeue().Dispose();
|
||||
}
|
||||
|
||||
tcs2.SetResult(0);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
tcs2.SetException(ex);
|
||||
}
|
||||
},
|
||||
tcs);
|
||||
|
||||
// REVIEW: Should we add a timeout here to be safe?
|
||||
tcs.Task.Wait();
|
||||
writeReqPool.Dequeue().Dispose();
|
||||
}
|
||||
}, this);
|
||||
}
|
||||
|
||||
Memory2.Dispose();
|
||||
ListenSocket = null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,13 +11,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
public ListenerContext()
|
||||
{
|
||||
Memory2 = new MemoryPool2();
|
||||
}
|
||||
|
||||
public ListenerContext(ServiceContext serviceContext)
|
||||
: base(serviceContext)
|
||||
{
|
||||
Memory2 = new MemoryPool2();
|
||||
ConnectionManager = new ConnectionManager();
|
||||
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
|
||||
}
|
||||
|
||||
|
|
@ -27,6 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
ServerAddress = listenerContext.ServerAddress;
|
||||
Thread = listenerContext.Thread;
|
||||
Memory2 = listenerContext.Memory2;
|
||||
ConnectionManager = listenerContext.ConnectionManager;
|
||||
WriteReqPool = listenerContext.WriteReqPool;
|
||||
Log = listenerContext.Log;
|
||||
}
|
||||
|
|
@ -37,6 +38,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
public MemoryPool2 Memory2 { get; set; }
|
||||
|
||||
public ConnectionManager ConnectionManager { get; set; }
|
||||
|
||||
public Queue<UvWriteReq> WriteReqPool { get; set; }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
await StartAsync(address, thread).ConfigureAwait(false);
|
||||
|
||||
await Thread.PostAsync(_this => _this.PostCallback(),
|
||||
this).ConfigureAwait(false);
|
||||
await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(),
|
||||
this).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private void PostCallback()
|
||||
|
|
@ -100,5 +100,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
socket);
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task DisposeAsync()
|
||||
{
|
||||
// Call base first so the ListenSocket gets closed and doesn't
|
||||
// try to dispatch connections to closed pipes.
|
||||
await base.DisposeAsync();
|
||||
|
||||
if (Thread.FatalError == null && ListenPipe != null)
|
||||
{
|
||||
await Thread.PostAsync(state =>
|
||||
{
|
||||
var listener = (ListenerPrimary)state;
|
||||
listener.ListenPipe.Dispose();
|
||||
|
||||
foreach (var dispatchPipe in listener._dispatchPipes)
|
||||
{
|
||||
dispatchPipe.Dispose();
|
||||
}
|
||||
}, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ using System;
|
|||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Networking;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
|
@ -16,7 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
/// A secondary listener is delegated requests from a primary listener via a named pipe or
|
||||
/// UNIX domain socket.
|
||||
/// </summary>
|
||||
public abstract class ListenerSecondary : ListenerContext, IDisposable
|
||||
public abstract class ListenerSecondary : ListenerContext, IAsyncDisposable
|
||||
{
|
||||
private string _pipeName;
|
||||
private IntPtr _ptr;
|
||||
|
|
@ -155,7 +154,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
// Ensure the event loop is still running.
|
||||
// If the event loop isn't running and we try to wait on this Post
|
||||
|
|
@ -163,16 +162,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
// the exception that stopped the event loop will never be surfaced.
|
||||
if (Thread.FatalError == null)
|
||||
{
|
||||
Thread.Send(listener =>
|
||||
await Thread.PostAsync(state =>
|
||||
{
|
||||
var listener = (ListenerSecondary)state;
|
||||
listener.DispatchPipe.Dispose();
|
||||
listener.FreeBuffer();
|
||||
}, this);
|
||||
|
||||
await ConnectionManager.CloseConnectionsAsync();
|
||||
|
||||
await Thread.PostAsync(state =>
|
||||
{
|
||||
var listener = (ListenerSecondary)state;
|
||||
var writeReqPool = listener.WriteReqPool;
|
||||
while (writeReqPool.Count > 0)
|
||||
{
|
||||
writeReqPool.Dequeue().Dispose();
|
||||
}
|
||||
}, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
FreeBuffer();
|
||||
}
|
||||
|
||||
Memory2.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
public enum ProduceEndType
|
||||
{
|
||||
SocketShutdownSend,
|
||||
SocketShutdown,
|
||||
SocketDisconnect,
|
||||
ConnectionKeepAlive,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -183,6 +183,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
}
|
||||
}
|
||||
|
||||
public void CompleteAwaiting()
|
||||
{
|
||||
Complete();
|
||||
}
|
||||
|
||||
public void AbortAwaiting()
|
||||
{
|
||||
_awaitableError = new TaskCanceledException("The request was aborted");
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
private const int _initialTaskQueues = 64;
|
||||
private const int _maxPooledWriteContexts = 32;
|
||||
|
||||
private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
|
||||
private static readonly Action<object> _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered();
|
||||
|
||||
private readonly KestrelThread _thread;
|
||||
|
|
@ -205,17 +204,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
switch (endType)
|
||||
{
|
||||
case ProduceEndType.SocketShutdownSend:
|
||||
case ProduceEndType.SocketShutdown:
|
||||
WriteAsync(default(ArraySegment<byte>),
|
||||
default(CancellationToken),
|
||||
socketShutdownSend: true,
|
||||
socketDisconnect: false);
|
||||
socketDisconnect: true,
|
||||
isSync: true);
|
||||
break;
|
||||
case ProduceEndType.SocketDisconnect:
|
||||
WriteAsync(default(ArraySegment<byte>),
|
||||
default(CancellationToken),
|
||||
socketShutdownSend: false,
|
||||
socketDisconnect: true);
|
||||
socketDisconnect: true,
|
||||
isSync: true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -256,6 +257,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
MemoryPoolBlock2 blockToReturn = null;
|
||||
|
||||
|
||||
lock (_returnLock)
|
||||
{
|
||||
Debug.Assert(!_lastStart.IsDefault);
|
||||
|
|
@ -277,7 +279,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
|
||||
if (blockToReturn != null)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_returnBlocks, blockToReturn);
|
||||
ReturnBlocks(blockToReturn);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -593,6 +595,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
return;
|
||||
}
|
||||
|
||||
Self._log.ConnectionWriteFin(Self._connectionId);
|
||||
|
||||
var shutdownReq = new UvShutdownReq(Self._log);
|
||||
shutdownReq.Init(Self._thread.Loop);
|
||||
shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) =>
|
||||
|
|
@ -618,9 +622,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
return;
|
||||
}
|
||||
|
||||
// Ensure all blocks are returned before calling OnSocketClosed
|
||||
// to ensure the MemoryPool doesn't get disposed too soon.
|
||||
Self.ReturnAllBlocks();
|
||||
Self._socket.Dispose();
|
||||
Self._connection.OnSocketClosed();
|
||||
Self.ReturnAllBlocks();
|
||||
Self._log.ConnectionStop(Self._connectionId);
|
||||
CompleteWithContextLock();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Filter;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel
|
||||
|
|
@ -9,6 +10,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
{
|
||||
int ThreadCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The amount of time after the server begins shutting down before connections will be forcefully closed.
|
||||
/// By default, Kestrel will wait 5 seconds for any ongoing requests to complete before terminating
|
||||
/// the connection.
|
||||
/// A custom timeout can be configured using the "kestrel.shutdownTimeout" key in <seealso cref="Microsoft.Extensions.Configuration.IConfiguration"/>.
|
||||
/// The value will be parsed as a float representing the timout in seconds.
|
||||
/// </summary>
|
||||
TimeSpan ShutdownTimeout { get; set; }
|
||||
|
||||
bool NoDelay { get; set; }
|
||||
|
||||
/// <summary>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Http;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
|
||||
{
|
||||
public class ConnectionManager
|
||||
{
|
||||
private bool _managerClosed;
|
||||
private ConcurrentDictionary<long, Connection> _activeConnections = new ConcurrentDictionary<long, Connection>();
|
||||
|
||||
public void AddConnection(long connectionId, Connection connection)
|
||||
{
|
||||
if (_managerClosed)
|
||||
{
|
||||
throw new InvalidOperationException(nameof(ConnectionManager) + " closed.");
|
||||
}
|
||||
|
||||
if (!_activeConnections.TryAdd(connectionId, connection))
|
||||
{
|
||||
throw new InvalidOperationException("Connection already added.");
|
||||
}
|
||||
}
|
||||
|
||||
public void ConnectionStopped(long connectionId)
|
||||
{
|
||||
Connection removed;
|
||||
_activeConnections.TryRemove(connectionId, out removed);
|
||||
}
|
||||
|
||||
public Task CloseConnectionsAsync()
|
||||
{
|
||||
if (_managerClosed)
|
||||
{
|
||||
throw new InvalidOperationException(nameof(ConnectionManager) + " already closed.");
|
||||
}
|
||||
|
||||
_managerClosed = true;
|
||||
|
||||
var stopTasks = new List<Task>();
|
||||
|
||||
foreach (var connectionId in _activeConnections.Keys)
|
||||
{
|
||||
Connection removed;
|
||||
if (_activeConnections.TryRemove(connectionId, out removed))
|
||||
{
|
||||
stopTasks.Add(removed.StopAsync());
|
||||
}
|
||||
}
|
||||
|
||||
return Task.WhenAll(stopTasks);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -33,6 +33,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
|
|||
|
||||
void ConnectionDisconnectedWrite(long connectionId, int count, Exception ex);
|
||||
|
||||
void NotAllConnectionsClosedGracefully();
|
||||
|
||||
void ApplicationError(Exception ex);
|
||||
}
|
||||
}
|
||||
|
|
@ -27,8 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
private static readonly Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
|
||||
private static readonly Action<object, object> _socketCallbackAdapter = (callback, state) => ((Action<SocketOutput>)callback).Invoke((SocketOutput)state);
|
||||
private static readonly Action<object, object> _tcsCallbackAdapter = (callback, state) => ((Action<TaskCompletionSource<int>>)callback).Invoke((TaskCompletionSource<int>)state);
|
||||
private static readonly Action<object, object> _listenerPrimaryCallbackAdapter = (callback, state) => ((Action<ListenerPrimary>)callback).Invoke((ListenerPrimary)state);
|
||||
private static readonly Action<object, object> _listenerSecondaryCallbackAdapter = (callback, state) => ((Action<ListenerSecondary>)callback).Invoke((ListenerSecondary)state);
|
||||
private static readonly Action<object, object> _postAsyncCallbackAdapter = (callback, state) => ((Action<object>)callback).Invoke(state);
|
||||
|
||||
private readonly KestrelEngine _engine;
|
||||
private readonly IApplicationLifetime _appLifetime;
|
||||
|
|
@ -78,23 +77,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
return;
|
||||
}
|
||||
|
||||
var stepTimeout = (int)(timeout.TotalMilliseconds / 3);
|
||||
var stepTimeout = (int)(timeout.TotalMilliseconds / 2);
|
||||
|
||||
Post(t => t.OnStop());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
{
|
||||
try
|
||||
{
|
||||
Post(t => t.OnStopRude());
|
||||
Post(t => t.OnStopImmediate());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
{
|
||||
Post(t => t.OnStopImmediate());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
{
|
||||
#if NET451
|
||||
_thread.Abort();
|
||||
_thread.Abort();
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
|
|
@ -118,11 +113,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
|
||||
private void OnStop()
|
||||
{
|
||||
_post.Unreference();
|
||||
}
|
||||
|
||||
private void OnStopRude()
|
||||
{
|
||||
// If the listeners were all disposed gracefully there should be no handles
|
||||
// left to dispose other than _post.
|
||||
// We dispose everything here in the event they are not closed gracefully.
|
||||
_engine.Libuv.walk(
|
||||
_loop,
|
||||
(ptr, arg) =>
|
||||
|
|
@ -134,6 +127,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
}
|
||||
},
|
||||
IntPtr.Zero);
|
||||
|
||||
_post.Unreference();
|
||||
}
|
||||
|
||||
private void OnStopImmediate()
|
||||
|
|
@ -179,14 +174,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
_post.Send();
|
||||
}
|
||||
|
||||
public Task PostAsync(Action<ListenerPrimary> callback, ListenerPrimary state)
|
||||
public Task PostAsync(Action<object> callback, object state)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
lock (_workSync)
|
||||
{
|
||||
_workAdding.Enqueue(new Work
|
||||
{
|
||||
CallbackAdapter = _listenerPrimaryCallbackAdapter,
|
||||
CallbackAdapter = _postAsyncCallbackAdapter,
|
||||
Callback = callback,
|
||||
State = state,
|
||||
Completion = tcs
|
||||
|
|
@ -196,35 +191,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
return tcs.Task;
|
||||
}
|
||||
|
||||
public Task PostAsync(Action<ListenerSecondary> callback, ListenerSecondary state)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
lock (_workSync)
|
||||
{
|
||||
_workAdding.Enqueue(new Work
|
||||
{
|
||||
CallbackAdapter = _listenerSecondaryCallbackAdapter,
|
||||
Callback = callback,
|
||||
State = state,
|
||||
Completion = tcs
|
||||
});
|
||||
}
|
||||
_post.Send();
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
public void Send(Action<ListenerSecondary> callback, ListenerSecondary state)
|
||||
{
|
||||
if (_loop.ThreadId == Thread.CurrentThread.ManagedThreadId)
|
||||
{
|
||||
callback.Invoke(state);
|
||||
}
|
||||
else
|
||||
{
|
||||
PostAsync(callback, state).Wait();
|
||||
}
|
||||
}
|
||||
|
||||
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
|
||||
{
|
||||
lock (_workSync)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
private static readonly Action<ILogger, long, Exception> _connectionDisconnect;
|
||||
private static readonly Action<ILogger, long, Exception> _connectionError;
|
||||
private static readonly Action<ILogger, long, int, Exception> _connectionDisconnectedWrite;
|
||||
private static readonly Action<ILogger, Exception> _notAllConnectionsClosedGracefully;
|
||||
|
||||
protected readonly ILogger _logger;
|
||||
|
||||
|
|
@ -37,12 +38,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
_connectionWriteFin = LoggerMessage.Define<long>(LogLevel.Debug, 7, @"Connection id ""{ConnectionId}"" sending FIN.");
|
||||
_connectionWroteFin = LoggerMessage.Define<long, int>(LogLevel.Debug, 8, @"Connection id ""{ConnectionId}"" sent FIN with status ""{Status}"".");
|
||||
_connectionKeepAlive = LoggerMessage.Define<long>(LogLevel.Debug, 9, @"Connection id ""{ConnectionId}"" completed keep alive response.");
|
||||
_connectionDisconnect = LoggerMessage.Define<long>(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnected.");
|
||||
_connectionDisconnect = LoggerMessage.Define<long>(LogLevel.Debug, 10, @"Connection id ""{ConnectionId}"" disconnecting.");
|
||||
// ConnectionWrite: Reserved: 11
|
||||
// ConnectionWriteCallback: Reserved: 12
|
||||
// ApplicationError: Reserved: 13 - LoggerMessage.Define overload not present
|
||||
_connectionError = LoggerMessage.Define<long>(LogLevel.Information, 14, @"Connection id ""{ConnectionId}"" communication error");
|
||||
_connectionDisconnectedWrite = LoggerMessage.Define<long, int>(LogLevel.Debug, 15, @"Connection id ""{ConnectionId}"" write of ""{count}"" bytes to disconnected client.");
|
||||
_notAllConnectionsClosedGracefully = LoggerMessage.Define(LogLevel.Debug, 16, "Some connections failed to close gracefully during server shutdown.");
|
||||
}
|
||||
|
||||
public KestrelTrace(ILogger logger)
|
||||
|
|
@ -128,6 +130,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
_connectionDisconnectedWrite(_logger, connectionId, count, ex);
|
||||
}
|
||||
|
||||
public virtual void NotAllConnectionsClosedGracefully()
|
||||
{
|
||||
_notAllConnectionsClosedGracefully(_logger, null);
|
||||
}
|
||||
|
||||
public virtual void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
|
||||
{
|
||||
_logger.Log(logLevel, eventId, state, exception, formatter);
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Networking;
|
||||
|
||||
|
|
@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
|
||||
public IDisposable CreateServer(ServerAddress address)
|
||||
{
|
||||
var listeners = new List<IDisposable>();
|
||||
var listeners = new List<IAsyncDisposable>();
|
||||
|
||||
var usingPipes = address.IsUnixPipe;
|
||||
|
||||
|
|
@ -91,23 +91,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
|
||||
first = false;
|
||||
}
|
||||
|
||||
return new Disposable(() =>
|
||||
{
|
||||
foreach (var listener in listeners)
|
||||
{
|
||||
listener.Dispose();
|
||||
}
|
||||
DisposeListeners(listeners);
|
||||
});
|
||||
}
|
||||
catch
|
||||
{
|
||||
foreach (var listener in listeners)
|
||||
{
|
||||
listener.Dispose();
|
||||
}
|
||||
DisposeListeners(listeners);
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private void DisposeListeners(List<IAsyncDisposable> listeners)
|
||||
{
|
||||
var disposeTasks = new List<Task>();
|
||||
|
||||
foreach (var listener in listeners)
|
||||
{
|
||||
disposeTasks.Add(listener.DisposeAsync());
|
||||
}
|
||||
|
||||
if (!Task.WhenAll(disposeTasks).Wait(ServerInformation.ShutdownTimeout))
|
||||
{
|
||||
Log.NotAllConnectionsClosedGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
var trace = new KestrelTrace(_logger);
|
||||
var engine = new KestrelEngine(new ServiceContext
|
||||
{
|
||||
FrameFactory = (context, remoteEP, localEP, prepareRequest) =>
|
||||
FrameFactory = (context, remoteEP, localEP, prepareRequest) =>
|
||||
{
|
||||
return new Frame<TContext>(application, context, remoteEP, localEP, prepareRequest);
|
||||
},
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
|
||||
Addresses = GetAddresses(configuration);
|
||||
ThreadCount = GetThreadCount(configuration);
|
||||
ShutdownTimeout = GetShutdownTimeout(configuration);
|
||||
NoDelay = GetNoDelay(configuration);
|
||||
PoolingParameters = new KestrelServerPoolingParameters(configuration);
|
||||
}
|
||||
|
|
@ -29,6 +30,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
|
||||
public int ThreadCount { get; set; }
|
||||
|
||||
public TimeSpan ShutdownTimeout { get; set; }
|
||||
|
||||
public bool NoDelay { get; set; }
|
||||
|
||||
public KestrelServerPoolingParameters PoolingParameters { get; }
|
||||
|
|
@ -93,6 +96,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel
|
|||
return ProcessorThreadCount;
|
||||
}
|
||||
|
||||
private TimeSpan GetShutdownTimeout(IConfiguration configuration)
|
||||
{
|
||||
var shutdownTimeoutString = configuration["kestrel.shutdownTimout"];
|
||||
|
||||
float shutdownTimeout;
|
||||
if (float.TryParse(shutdownTimeoutString, NumberStyles.Float, CultureInfo.InvariantCulture, out shutdownTimeout))
|
||||
{
|
||||
return TimeSpan.FromSeconds(shutdownTimeout);
|
||||
}
|
||||
|
||||
return TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
private static bool GetNoDelay(IConfiguration configuration)
|
||||
{
|
||||
var noDelayString = configuration["kestrel.noDelay"];
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Filter;
|
||||
|
|
@ -23,11 +24,13 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
|
||||
var configuration = new ConfigurationBuilder().Build();
|
||||
ServerInformation = new KestrelServerInformation(configuration);
|
||||
ServerInformation.ShutdownTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
HttpComponentFactory = new HttpComponentFactory(ServerInformation);
|
||||
}
|
||||
|
||||
public TestServiceContext(IConnectionFilter filter)
|
||||
: base()
|
||||
: this()
|
||||
{
|
||||
ServerInformation.ConnectionFilter = filter;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue