Clean up timer code with custom awaitable (#1935)
- Gets rid of locks by using an event loop
This commit is contained in:
parent
cb5bba36fc
commit
8cc851ff9b
|
|
@ -28,8 +28,17 @@ namespace Microsoft.AspNetCore.Http.Connections
|
|||
private static readonly Action<ILogger, Exception> _scanningConnections =
|
||||
LoggerMessage.Define(LogLevel.Trace, new EventId(6, "ScanningConnections"), "Scanning connections.");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _scanningConnectionsFailed =
|
||||
LoggerMessage.Define(LogLevel.Error, new EventId(7, "ScanningConnectionsFailed"), "Scanning connections failed.");
|
||||
|
||||
private static readonly Action<ILogger, TimeSpan, Exception> _scannedConnections =
|
||||
LoggerMessage.Define<TimeSpan>(LogLevel.Trace, new EventId(7, "ScannedConnections"), "Scanned connections in {Duration}.");
|
||||
LoggerMessage.Define<TimeSpan>(LogLevel.Trace, new EventId(8, "ScannedConnections"), "Scanned connections in {Duration}.");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _heartbeatStarted =
|
||||
LoggerMessage.Define(LogLevel.Trace, new EventId(9, "HeartBeatStarted"), "Starting connection heartbeat.");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _heartbeatEnded =
|
||||
LoggerMessage.Define(LogLevel.Trace, new EventId(10, "HeartBeatEnded"), "Ending connection heartbeat.");
|
||||
|
||||
public static void CreatedNewConnection(ILogger logger, string connectionId)
|
||||
{
|
||||
|
|
@ -61,10 +70,25 @@ namespace Microsoft.AspNetCore.Http.Connections
|
|||
_scanningConnections(logger, null);
|
||||
}
|
||||
|
||||
public static void ScanningConnectionsFailed(ILogger logger, Exception exception)
|
||||
{
|
||||
_scanningConnectionsFailed(logger, exception);
|
||||
}
|
||||
|
||||
public static void ScannedConnections(ILogger logger, TimeSpan duration)
|
||||
{
|
||||
_scannedConnections(logger, duration, null);
|
||||
}
|
||||
|
||||
public static void HeartBeatStarted(ILogger logger)
|
||||
{
|
||||
_heartbeatStarted(logger, null);
|
||||
}
|
||||
|
||||
public static void HeartBeatEnded(ILogger logger)
|
||||
{
|
||||
_heartbeatEnded(logger, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,11 +28,9 @@ namespace Microsoft.AspNetCore.Http.Connections
|
|||
|
||||
private readonly ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)> _connections =
|
||||
new ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)>(StringComparer.Ordinal);
|
||||
private Timer _timer;
|
||||
private readonly TimerAwaitable _nextHeartbeat;
|
||||
private readonly ILogger<HttpConnectionManager> _logger;
|
||||
private readonly ILogger<HttpConnectionContext> _connectionLogger;
|
||||
private readonly object _executionLock = new object();
|
||||
private bool _disposed;
|
||||
|
||||
public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
|
||||
{
|
||||
|
|
@ -40,22 +38,15 @@ namespace Microsoft.AspNetCore.Http.Connections
|
|||
_connectionLogger = loggerFactory.CreateLogger<HttpConnectionContext>();
|
||||
appLifetime.ApplicationStarted.Register(() => Start());
|
||||
appLifetime.ApplicationStopping.Register(() => CloseConnections());
|
||||
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);
|
||||
}
|
||||
|
||||
public void Start()
|
||||
{
|
||||
lock (_executionLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
_nextHeartbeat.Start();
|
||||
|
||||
if (_timer == null)
|
||||
{
|
||||
_timer = new Timer(Scan, this, _heartbeatTickRate, _heartbeatTickRate);
|
||||
}
|
||||
}
|
||||
// Start the timer loop
|
||||
_ = ExecuteTimerLoop();
|
||||
}
|
||||
|
||||
public bool TryGetConnection(string id, out HttpConnectionContext connection)
|
||||
|
|
@ -114,118 +105,98 @@ namespace Microsoft.AspNetCore.Http.Connections
|
|||
return WebEncoders.Base64UrlEncode(buffer);
|
||||
}
|
||||
|
||||
private static void Scan(object state)
|
||||
private async Task ExecuteTimerLoop()
|
||||
{
|
||||
((HttpConnectionManager)state).Scan();
|
||||
}
|
||||
Log.HeartBeatStarted(_logger);
|
||||
|
||||
public void Scan()
|
||||
{
|
||||
// If we couldn't get the lock it means one of 2 things are true:
|
||||
// - We're about to dispose so we don't care to run the scan callback anyways.
|
||||
// - The previous Scan took long enough that the next scan tried to run in parallel
|
||||
// In either case just do nothing and end the timer callback as soon as possible
|
||||
if (!Monitor.TryEnter(_executionLock))
|
||||
// Dispose the timer when all the code consuming callbacks has completed
|
||||
using (_nextHeartbeat)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (_disposed)
|
||||
// The TimerAwaitable will return true until Stop is called
|
||||
while (await _nextHeartbeat)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Pause the timer while we're running
|
||||
_timer?.Change(Timeout.Infinite, Timeout.Infinite);
|
||||
|
||||
// Time the scan so we know if it gets slower than 1sec
|
||||
var timer = ValueStopwatch.StartNew();
|
||||
HttpConnectionsEventSource.Log.ScanningConnections();
|
||||
Log.ScanningConnections(_logger);
|
||||
|
||||
// Scan the registered connections looking for ones that have timed out
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
HttpConnectionContext.ConnectionStatus status;
|
||||
DateTimeOffset lastSeenUtc;
|
||||
var connection = c.Value.Connection;
|
||||
|
||||
try
|
||||
{
|
||||
connection.Lock.Wait();
|
||||
|
||||
// Capture the connection state
|
||||
status = connection.Status;
|
||||
|
||||
lastSeenUtc = connection.LastSeenUtc;
|
||||
await ScanAsync();
|
||||
}
|
||||
finally
|
||||
catch (Exception ex)
|
||||
{
|
||||
connection.Lock.Release();
|
||||
}
|
||||
|
||||
// Once the decision has been made to dispose we don't check the status again
|
||||
// But don't clean up connections while the debugger is attached.
|
||||
if (!Debugger.IsAttached && status == HttpConnectionContext.ConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
|
||||
{
|
||||
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
|
||||
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
|
||||
|
||||
// This is most likely a long polling connection. The transport here ends because
|
||||
// a poll completed and has been inactive for > 5 seconds so we wait for the
|
||||
// application to finish gracefully
|
||||
_ = DisposeAndRemoveAsync(connection, closeGracefully: true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Tick the heartbeat, if the connection is still active
|
||||
connection.TickHeartbeat();
|
||||
Log.ScanningConnectionsFailed(_logger, ex);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: We could use this timer to determine if the connection scanner is too slow, but we need an idea of what "too slow" is.
|
||||
var elapsed = timer.GetElapsedTime();
|
||||
HttpConnectionsEventSource.Log.ScannedConnections(elapsed);
|
||||
Log.ScannedConnections(_logger, elapsed);
|
||||
|
||||
// Resume once we finished processing all connections
|
||||
_timer?.Change(_heartbeatTickRate, _heartbeatTickRate);
|
||||
}
|
||||
finally
|
||||
|
||||
Log.HeartBeatEnded(_logger);
|
||||
}
|
||||
|
||||
public async Task ScanAsync()
|
||||
{
|
||||
// Time the scan so we know if it gets slower than 1sec
|
||||
var timer = ValueStopwatch.StartNew();
|
||||
HttpConnectionsEventSource.Log.ScanningConnections();
|
||||
Log.ScanningConnections(_logger);
|
||||
|
||||
// Scan the registered connections looking for ones that have timed out
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
// Exit the lock now
|
||||
Monitor.Exit(_executionLock);
|
||||
HttpConnectionContext.ConnectionStatus status;
|
||||
DateTimeOffset lastSeenUtc;
|
||||
var connection = c.Value.Connection;
|
||||
|
||||
await connection.Lock.WaitAsync();
|
||||
|
||||
try
|
||||
{
|
||||
// Capture the connection state
|
||||
status = connection.Status;
|
||||
|
||||
lastSeenUtc = connection.LastSeenUtc;
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Lock.Release();
|
||||
}
|
||||
|
||||
// Once the decision has been made to dispose we don't check the status again
|
||||
// But don't clean up connections while the debugger is attached.
|
||||
if (!Debugger.IsAttached && status == HttpConnectionContext.ConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
|
||||
{
|
||||
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
|
||||
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
|
||||
|
||||
// This is most likely a long polling connection. The transport here ends because
|
||||
// a poll completed and has been inactive for > 5 seconds so we wait for the
|
||||
// application to finish gracefully
|
||||
_ = DisposeAndRemoveAsync(connection, closeGracefully: true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Tick the heartbeat, if the connection is still active
|
||||
connection.TickHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
var elapsed = timer.GetElapsedTime();
|
||||
HttpConnectionsEventSource.Log.ScannedConnections(elapsed);
|
||||
Log.ScannedConnections(_logger, elapsed);
|
||||
}
|
||||
|
||||
public void CloseConnections()
|
||||
{
|
||||
lock (_executionLock)
|
||||
// Stop firing the timer
|
||||
_nextHeartbeat.Stop();
|
||||
|
||||
var tasks = new List<Task>();
|
||||
|
||||
// REVIEW: In the future we can consider a hybrid where we first try to wait for shutdown
|
||||
// for a certain time frame then after some grace period we shutdown more aggressively
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
|
||||
// Stop firing the timer
|
||||
_timer?.Dispose();
|
||||
|
||||
var tasks = new List<Task>();
|
||||
|
||||
// REVIEW: In the future we can consider a hybrid where we first try to wait for shutdown
|
||||
// for a certain time frame then after some grace period we shutdown more aggressively
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
// We're shutting down so don't wait for closing the application
|
||||
tasks.Add(DisposeAndRemoveAsync(c.Value.Connection, closeGracefully: false));
|
||||
}
|
||||
|
||||
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
|
||||
// We're shutting down so don't wait for closing the application
|
||||
tasks.Add(DisposeAndRemoveAsync(c.Value.Connection, closeGracefully: false));
|
||||
}
|
||||
|
||||
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
public async Task DisposeAndRemoveAsync(HttpConnectionContext connection, bool closeGracefully)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,109 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.Http.Connections.Internal
|
||||
{
|
||||
public class TimerAwaitable : IDisposable, ICriticalNotifyCompletion
|
||||
{
|
||||
private Timer _timer;
|
||||
private Action _callback;
|
||||
private static readonly Action _callbackCompleted = () => { };
|
||||
|
||||
private readonly TimeSpan _period;
|
||||
|
||||
private readonly TimeSpan _dueTime;
|
||||
private bool _disposed;
|
||||
private bool _running = true;
|
||||
private object _lockObj = new object();
|
||||
|
||||
public TimerAwaitable(TimeSpan dueTime, TimeSpan period)
|
||||
{
|
||||
_dueTime = dueTime;
|
||||
_period = period;
|
||||
}
|
||||
|
||||
public void Start()
|
||||
{
|
||||
if (_timer == null)
|
||||
{
|
||||
lock (_lockObj)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (_timer == null)
|
||||
{
|
||||
_timer = new Timer(state => ((TimerAwaitable)state).Tick(), this, _dueTime, _period);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public TimerAwaitable GetAwaiter() => this;
|
||||
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
|
||||
|
||||
public bool GetResult()
|
||||
{
|
||||
_callback = null;
|
||||
|
||||
return _running;
|
||||
}
|
||||
|
||||
private void Tick()
|
||||
{
|
||||
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
|
||||
continuation?.Invoke();
|
||||
}
|
||||
|
||||
public void OnCompleted(Action continuation)
|
||||
{
|
||||
if (ReferenceEquals(_callback, _callbackCompleted) ||
|
||||
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
|
||||
{
|
||||
Task.Run(continuation);
|
||||
}
|
||||
}
|
||||
|
||||
public void UnsafeOnCompleted(Action continuation)
|
||||
{
|
||||
OnCompleted(continuation);
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
lock (_lockObj)
|
||||
{
|
||||
// Stop should be used to trigger the call to end the loop which disposes
|
||||
if (_disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(GetType().FullName);
|
||||
}
|
||||
|
||||
_running = false;
|
||||
}
|
||||
|
||||
// Call tick here to make sure that we yield the callback,
|
||||
// if it's currently waiting, we don't need to wait for the next period
|
||||
Tick();
|
||||
}
|
||||
|
||||
void IDisposable.Dispose()
|
||||
{
|
||||
lock (_lockObj)
|
||||
{
|
||||
_disposed = true;
|
||||
|
||||
_timer?.Dispose();
|
||||
|
||||
_timer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -449,7 +449,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
|
|||
|
||||
// The application is still running here because the poll is only killed
|
||||
// by the heartbeat so we pretend to do a scan and this should force the application task to complete
|
||||
manager.Scan();
|
||||
await manager.ScanAsync();
|
||||
|
||||
// The application task should complete gracefully
|
||||
await connection.ApplicationTask.OrTimeout();
|
||||
|
|
|
|||
|
|
@ -282,17 +282,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
|
|||
Assert.Equal(HttpConnectionContext.ConnectionStatus.Disposed, connection.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ScanAfterDisposeNoops()
|
||||
{
|
||||
var connectionManager = CreateConnectionManager();
|
||||
var connection = connectionManager.CreateConnection();
|
||||
|
||||
connectionManager.CloseConnections();
|
||||
|
||||
connectionManager.Scan();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ApplicationLifetimeIsHookedUp()
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue