Stop KestrelThreads in parallel
This commit is contained in:
parent
dc12f3150e
commit
b5c117695e
|
|
@ -21,16 +21,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
_threadPool = threadPool;
|
||||
}
|
||||
|
||||
public bool WalkConnectionsAndClose(TimeSpan timeout)
|
||||
public async Task<bool> WalkConnectionsAndCloseAsync(TimeSpan timeout)
|
||||
{
|
||||
var wh = new ManualResetEventSlim();
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
|
||||
_thread.Post(state => ((ConnectionManager)state).WalkConnectionsAndCloseCore(wh), this);
|
||||
_thread.Post(state => ((ConnectionManager)state).WalkConnectionsAndCloseCore(tcs), this);
|
||||
|
||||
return wh.Wait(timeout);
|
||||
return await Task.WhenAny(tcs.Task, Task.Delay(timeout)).ConfigureAwait(false) == tcs.Task;
|
||||
}
|
||||
|
||||
private void WalkConnectionsAndCloseCore(ManualResetEventSlim wh)
|
||||
private void WalkConnectionsAndCloseCore(TaskCompletionSource<object> tcs)
|
||||
{
|
||||
var connectionStopTasks = new List<Task>();
|
||||
|
||||
|
|
@ -48,7 +48,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
_threadPool.Run(() =>
|
||||
{
|
||||
Task.WaitAll(connectionStopTasks.ToArray());
|
||||
wh.Set();
|
||||
tcs.SetResult(null);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
private readonly KestrelEngine _engine;
|
||||
private readonly IApplicationLifetime _appLifetime;
|
||||
private readonly Thread _thread;
|
||||
private readonly TaskCompletionSource<object> _threadTcs = new TaskCompletionSource<object>();
|
||||
private readonly UvLoopHandle _loop;
|
||||
private readonly UvAsyncHandle _post;
|
||||
private Queue<Work> _workAdding = new Queue<Work>(1024);
|
||||
|
|
@ -89,13 +90,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
return tcs.Task;
|
||||
}
|
||||
|
||||
// This must be called from the libuv event loop.
|
||||
public void AllowStop()
|
||||
{
|
||||
_post.Unreference();
|
||||
}
|
||||
|
||||
public void Stop(TimeSpan timeout)
|
||||
public async Task StopAsync(TimeSpan timeout)
|
||||
{
|
||||
lock (_startSync)
|
||||
{
|
||||
|
|
@ -105,27 +100,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
}
|
||||
}
|
||||
|
||||
var stepTimeout = (int)(timeout.TotalMilliseconds / 3);
|
||||
|
||||
if (_thread.IsAlive)
|
||||
if (!_threadTcs.Task.IsCompleted)
|
||||
{
|
||||
// These operations need to run on the libuv thread so it only makes
|
||||
// sense to attempt execution if it's still running
|
||||
DisposeConnections();
|
||||
await DisposeConnectionsAsync().ConfigureAwait(false);
|
||||
|
||||
var stepTimeout = TimeSpan.FromTicks(timeout.Ticks / 3);
|
||||
|
||||
Post(t => t.AllowStop());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
if (!await WaitAsync(_threadTcs.Task, stepTimeout).ConfigureAwait(false))
|
||||
{
|
||||
|
||||
try
|
||||
{
|
||||
Post(t => t.OnStopRude());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
if (!await WaitAsync(_threadTcs.Task, stepTimeout).ConfigureAwait(false))
|
||||
{
|
||||
Post(t => t.OnStopImmediate());
|
||||
if (!_thread.Join(stepTimeout))
|
||||
if (!await WaitAsync(_threadTcs.Task, stepTimeout).ConfigureAwait(false))
|
||||
{
|
||||
_log.LogError(0, null, "KestrelThread.Stop failed to terminate libuv thread.");
|
||||
_log.LogError(0, null, "KestrelThread.StopAsync failed to terminate libuv thread.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -133,9 +127,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
{
|
||||
// REVIEW: Should we log something here?
|
||||
// Until we rework this logic, ODEs are bound to happen sometimes.
|
||||
if (!_thread.Join(stepTimeout))
|
||||
if (!await WaitAsync(_threadTcs.Task, stepTimeout).ConfigureAwait(false))
|
||||
{
|
||||
_log.LogError(0, null, "KestrelThread.Stop failed to terminate libuv thread.");
|
||||
_log.LogError(0, null, "KestrelThread.StopAsync failed to terminate libuv thread.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -147,22 +141,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
}
|
||||
}
|
||||
|
||||
private void DisposeConnections()
|
||||
private async Task DisposeConnectionsAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
// Close and wait for all connections
|
||||
if (!ConnectionManager.WalkConnectionsAndClose(_shutdownTimeout))
|
||||
if (!await ConnectionManager.WalkConnectionsAndCloseAsync(_shutdownTimeout).ConfigureAwait(false))
|
||||
{
|
||||
_log.LogError(0, null, "Waiting for connections timed out");
|
||||
_log.NotAllConnectionsClosedGracefully();
|
||||
}
|
||||
|
||||
var result = PostAsync(state =>
|
||||
var result = await WaitAsync(PostAsync(state =>
|
||||
{
|
||||
var listener = (KestrelThread)state;
|
||||
listener.WriteReqPool.Dispose();
|
||||
},
|
||||
this).Wait(_shutdownTimeout);
|
||||
this), _shutdownTimeout).ConfigureAwait(false);
|
||||
|
||||
if (!result)
|
||||
{
|
||||
|
|
@ -175,6 +169,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void AllowStop()
|
||||
{
|
||||
_post.Unreference();
|
||||
}
|
||||
|
||||
private void OnStopRude()
|
||||
{
|
||||
Walk(ptr =>
|
||||
|
|
@ -262,7 +262,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
{
|
||||
lock (_startSync)
|
||||
{
|
||||
var tcs = (TaskCompletionSource<int>)parameter;
|
||||
var tcs = (TaskCompletionSource<int>) parameter;
|
||||
try
|
||||
{
|
||||
_loop.Init(_engine.Libuv);
|
||||
|
|
@ -302,6 +302,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
// in Stop which should be observable.
|
||||
_appLifetime.StopApplication();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_threadTcs.SetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void OnPost()
|
||||
|
|
@ -385,6 +389,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
return wasWork;
|
||||
}
|
||||
|
||||
private static async Task<bool> WaitAsync(Task task, TimeSpan timeout)
|
||||
{
|
||||
return await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false) == task;
|
||||
}
|
||||
|
||||
private struct Work
|
||||
{
|
||||
public Action<object, object> CallbackAdapter;
|
||||
|
|
|
|||
|
|
@ -3,9 +3,11 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
||||
{
|
||||
|
|
@ -41,10 +43,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
|
||||
public void Dispose()
|
||||
{
|
||||
foreach (var thread in Threads)
|
||||
{
|
||||
thread.Stop(TimeSpan.FromSeconds(2.5));
|
||||
}
|
||||
Task.WaitAll(Threads.Select(thread => thread.StopAsync(TimeSpan.FromSeconds(2.5))).ToArray());
|
||||
|
||||
Threads.Clear();
|
||||
}
|
||||
|
||||
|
|
@ -107,16 +107,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
|
|||
|
||||
private void DisposeListeners(List<IAsyncDisposable> listeners)
|
||||
{
|
||||
var disposeTasks = new List<Task>();
|
||||
var disposeTasks = listeners.Select(listener => listener.DisposeAsync()).ToArray();
|
||||
|
||||
foreach (var listener in listeners)
|
||||
if (!Task.WaitAll(disposeTasks, TimeSpan.FromSeconds(2.5)))
|
||||
{
|
||||
disposeTasks.Add(listener.DisposeAsync());
|
||||
}
|
||||
|
||||
if (!Task.WhenAll(disposeTasks).Wait(ServerOptions.ShutdownTimeout))
|
||||
{
|
||||
Log.NotAllConnectionsClosedGracefully();
|
||||
Log.LogError(0, null, "Disposing listeners failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue