Move shutdown logic from transport to core (#1707)

* Move shutdown logic from transport to core
* Use weak references to track FrameConnections
This commit is contained in:
Stephen Halter 2017-04-19 18:08:36 -07:00 committed by GitHub
parent d40dbb81ea
commit a053ca4758
30 changed files with 500 additions and 364 deletions

View File

@ -12,7 +12,6 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
@ -22,7 +21,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private readonly FrameConnectionContext _context;
private readonly Frame _frame;
private readonly List<IConnectionAdapter> _connectionAdapters;
private readonly TaskCompletionSource<object> _frameStartedTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<bool> _frameStartedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private long _lastTimestamp;
private long _timeoutTimestamp = long.MaxValue;
@ -30,7 +30,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private AdaptedPipeline _adaptedPipeline;
private Stream _filteredStream;
private Task _adaptedPipelineTask = TaskCache.CompletedTask;
private Task _adaptedPipelineTask;
public FrameConnection(FrameConnectionContext context)
{
@ -91,6 +91,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
Log.ConnectionStop(ConnectionId);
KestrelEventSource.Log.ConnectionStop(this);
_socketClosedTcs.SetResult(null);
// The connection is already in the "aborted" state by this point, but we want to track it
// until RequestProcessingAsync completes for graceful shutdown.
@ -101,9 +102,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public async Task StopAsync()
{
await _frameStartedTcs.Task;
await _frame.StopAsync();
await _adaptedPipelineTask;
if (await _frameStartedTcs.Task)
{
await _frame.StopAsync();
await (_adaptedPipelineTask ?? Task.CompletedTask);
}
await _socketClosedTcs.Task;
}
public void Abort(Exception ex)
@ -111,6 +116,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_frame.Abort(ex);
}
public Task AbortAsync(Exception ex)
{
_frame.Abort(ex);
return StopAsync();
}
public void Timeout()
{
_frame.SetBadRequestState(RequestRejectionReason.RequestTimeout);
@ -152,7 +163,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
catch (Exception ex)
{
Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}.");
_frameStartedTcs.SetResult(null);
_frameStartedTcs.SetResult(false);
CloseRawPipes();
}
}
@ -185,7 +196,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
_lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks;
_frame.Start();
_frameStartedTcs.SetResult(null);
_frameStartedTcs.SetResult(true);
}
public void Tick(DateTimeOffset now)

View File

@ -429,7 +429,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_requestProcessingStopping = true;
Input.CancelPendingRead();
return _requestProcessingTask ?? TaskCache.CompletedTask;
Debug.Assert(_requestProcessingTask != null);
return _requestProcessingTask ?? Task.CompletedTask;
}
private void CancelRequestAbortedToken()

View File

@ -6,14 +6,19 @@ using System.Collections.Concurrent;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
public class FrameConnectionManager : IHeartbeatHandler
public class FrameConnectionManager
{
private readonly ConcurrentDictionary<long, FrameConnection> _connections
= new ConcurrentDictionary<long, FrameConnection>();
private readonly ConcurrentDictionary<long, FrameConnectionReference> _connectionReferences = new ConcurrentDictionary<long, FrameConnectionReference>();
private readonly IKestrelTrace _trace;
public FrameConnectionManager(IKestrelTrace trace)
{
_trace = trace;
}
public void AddConnection(long id, FrameConnection connection)
{
if (!_connections.TryAdd(id, connection))
if (!_connectionReferences.TryAdd(id, new FrameConnectionReference(connection)))
{
throw new ArgumentException(nameof(id));
}
@ -21,17 +26,30 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
public void RemoveConnection(long id)
{
if (!_connections.TryRemove(id, out _))
if (!_connectionReferences.TryRemove(id, out _))
{
throw new ArgumentException(nameof(id));
}
}
public void OnHeartbeat(DateTimeOffset now)
public void Walk(Action<FrameConnection> callback)
{
foreach (var kvp in _connections)
foreach (var kvp in _connectionReferences)
{
kvp.Value.Tick(now);
var reference = kvp.Value;
if (reference.TryGetConnection(out var connection))
{
callback(connection);
}
else if (_connectionReferences.TryRemove(kvp.Key, out reference))
{
// It's safe to modify the ConcurrentDictionary in the foreach.
// The connection reference has become unrooted because the application never completed.
_trace.ApplicationNeverCompleted(reference.ConnectionId);
}
// If both conditions are false, the connection was removed during the heartbeat.
}
}
}

View File

@ -0,0 +1,53 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
public static class FrameConnectionManagerShutdownExtensions
{
public static async Task<bool> CloseAllConnectionsAsync(this FrameConnectionManager connectionManager, CancellationToken token)
{
var closeTasks = new List<Task>();
connectionManager.Walk(connection =>
{
closeTasks.Add(connection.StopAsync());
});
var allClosedTask = Task.WhenAll(closeTasks.ToArray());
return await Task.WhenAny(allClosedTask, CancellationTokenAsTask(token)).ConfigureAwait(false) == allClosedTask;
}
public static async Task<bool> AbortAllConnectionsAsync(this FrameConnectionManager connectionManager)
{
var abortTasks = new List<Task>();
var canceledException = new TaskCanceledException("Request processing didn't complete within the shutdown timeout.");
connectionManager.Walk(connection =>
{
abortTasks.Add(connection.AbortAsync(canceledException));
});
var allAbortedTask = Task.WhenAll(abortTasks.ToArray());
return await Task.WhenAny(allAbortedTask, Task.Delay(1000)).ConfigureAwait(false) == allAbortedTask;
}
private static Task CancellationTokenAsTask(CancellationToken token)
{
if (token.IsCancellationRequested)
{
return Task.CompletedTask;
}
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
token.Register(() => tcs.SetResult(null));
return tcs.Task;
}
}
}

View File

@ -0,0 +1,25 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
public class FrameConnectionReference
{
private readonly WeakReference<FrameConnection> _weakReference;
public FrameConnectionReference(FrameConnection connection)
{
_weakReference = new WeakReference<FrameConnection>(connection);
ConnectionId = connection.ConnectionId;
}
public string ConnectionId { get; }
public bool TryGetConnection(out FrameConnection connection)
{
return _weakReference.TryGetTarget(out connection);
}
}
}

View File

@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
public class FrameHeartbeatManager : IHeartbeatHandler
{
private readonly FrameConnectionManager _connectionManager;
private readonly Action<FrameConnection> _walkCallback;
private DateTimeOffset _now;
public FrameHeartbeatManager(FrameConnectionManager connectionManager)
{
_connectionManager = connectionManager;
_walkCallback = WalkCallback;
}
public void OnHeartbeat(DateTimeOffset now)
{
_now = now;
_connectionManager.Walk(_walkCallback);
}
private void WalkCallback(FrameConnection connection)
{
connection.Tick(_now);
}
}
}

View File

@ -26,11 +26,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
void ConnectionHeadResponseBodyWrite(string connectionId, long count);
void NotAllConnectionsClosedGracefully();
void ConnectionBadRequest(string connectionId, BadHttpRequestException ex);
void ApplicationError(string connectionId, string traceIdentifier, Exception ex);
void NotAllConnectionsAborted();
void TimerSlow(TimeSpan interval, DateTimeOffset now);
void ApplicationNeverCompleted(string connectionId);
}
}

View File

@ -36,6 +36,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private static readonly Action<ILogger, string, int, Exception> _connectionDisconnectedWrite =
LoggerMessage.Define<string, int>(LogLevel.Debug, 15, @"Connection id ""{ConnectionId}"" write of ""{count}"" bytes to disconnected client.");
private static readonly Action<ILogger, Exception> _notAllConnectionsClosedGracefully =
LoggerMessage.Define(LogLevel.Debug, 16, "Some connections failed to close gracefully during server shutdown.");
private static readonly Action<ILogger, string, string, Exception> _connectionBadRequest =
LoggerMessage.Define<string, string>(LogLevel.Information, 17, @"Connection id ""{ConnectionId}"" bad request data: ""{message}""");
@ -45,8 +48,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private static readonly Action<ILogger, string, Exception> _requestProcessingError =
LoggerMessage.Define<string>(LogLevel.Information, 20, @"Connection id ""{ConnectionId}"" request processing ended abnormally.");
private static readonly Action<ILogger, Exception> _notAllConnectionsAborted =
LoggerMessage.Define(LogLevel.Debug, 21, "Some connections failed to abort during server shutdown.");
private static readonly Action<ILogger, TimeSpan, DateTimeOffset, Exception> _timerSlow =
LoggerMessage.Define<TimeSpan, DateTimeOffset>(LogLevel.Warning, 21, @"Heartbeat took longer than ""{interval}"" at ""{now}"".");
LoggerMessage.Define<TimeSpan, DateTimeOffset>(LogLevel.Warning, 22, @"Heartbeat took longer than ""{interval}"" at ""{now}"".");
private static readonly Action<ILogger, string, Exception> _applicationNeverCompleted =
LoggerMessage.Define<string>(LogLevel.Critical, 23, @"Connection id ""{ConnectionId}"" application never completed");
protected readonly ILogger _logger;
@ -100,6 +109,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_connectionHeadResponseBodyWrite(_logger, connectionId, count, null);
}
public void NotAllConnectionsClosedGracefully()
{
_notAllConnectionsClosedGracefully(_logger, null);
}
public void ConnectionBadRequest(string connectionId, BadHttpRequestException ex)
{
_connectionBadRequest(_logger, connectionId, ex.Message, ex);
@ -110,11 +124,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_requestProcessingError(_logger, connectionId, ex);
}
public void NotAllConnectionsAborted()
{
_notAllConnectionsAborted(_logger, null);
}
public virtual void TimerSlow(TimeSpan interval, DateTimeOffset now)
{
_timerSlow(_logger, interval, now, null);
}
public virtual void ApplicationNeverCompleted(string connectionId)
{
_applicationNeverCompleted(_logger, connectionId, null);
}
public virtual void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
=> _logger.Log(logLevel, eventId, state, exception, formatter);

View File

@ -20,46 +20,89 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
public class KestrelServer : IServer
{
private readonly List<ITransport> _transports = new List<ITransport>();
private readonly ILogger _logger;
private readonly Heartbeat _heartbeat;
private readonly IServerAddressesFeature _serverAddresses;
private readonly ITransportFactory _transportFactory;
private bool _isRunning;
private bool _hasStarted;
private int _stopped;
private Heartbeat _heartbeat;
public KestrelServer(
IOptions<KestrelServerOptions> options,
ITransportFactory transportFactory,
ILoggerFactory loggerFactory)
public KestrelServer(IOptions<KestrelServerOptions> options, ITransportFactory transportFactory, ILoggerFactory loggerFactory)
: this(transportFactory, CreateServiceContext(options, loggerFactory))
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
}
// For testing
internal KestrelServer(ITransportFactory transportFactory, ServiceContext serviceContext)
{
if (transportFactory == null)
{
throw new ArgumentNullException(nameof(transportFactory));
}
if (loggerFactory == null)
{
throw new ArgumentNullException(nameof(loggerFactory));
}
Options = options.Value ?? new KestrelServerOptions();
_transportFactory = transportFactory;
_logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel");
ServiceContext = serviceContext;
var frameHeartbeatManager = new FrameHeartbeatManager(serviceContext.ConnectionManager);
_heartbeat = new Heartbeat(
new IHeartbeatHandler[] { serviceContext.DateHeaderValueManager, frameHeartbeatManager },
serviceContext.SystemClock, Trace);
Features = new FeatureCollection();
_serverAddresses = new ServerAddressesFeature();
Features.Set(_serverAddresses);
}
private static ServiceContext CreateServiceContext(IOptions<KestrelServerOptions> options, ILoggerFactory loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
if (loggerFactory == null)
{
throw new ArgumentNullException(nameof(loggerFactory));
}
var serverOptions = options.Value ?? new KestrelServerOptions();
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel");
var trace = new KestrelTrace(logger);
var connectionManager = new FrameConnectionManager(trace);
var systemClock = new SystemClock();
var dateHeaderValueManager = new DateHeaderValueManager(systemClock);
IThreadPool threadPool;
if (serverOptions.UseTransportThread)
{
threadPool = new InlineLoggingThreadPool(trace);
}
else
{
threadPool = new LoggingThreadPool(trace);
}
return new ServiceContext
{
Log = trace,
HttpParserFactory = frameParser => new HttpParser<FrameAdapter>(frameParser.Frame.ServiceContext.Log),
ThreadPool = threadPool,
SystemClock = systemClock,
DateHeaderValueManager = dateHeaderValueManager,
ConnectionManager = connectionManager,
ServerOptions = serverOptions
};
}
public IFeatureCollection Features { get; }
public KestrelServerOptions Options { get; }
public KestrelServerOptions Options => ServiceContext.ServerOptions;
private ServiceContext ServiceContext { get; }
private IKestrelTrace Trace => ServiceContext.Log;
private FrameConnectionManager ConnectionManager => ServiceContext.ConnectionManager;
public async Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken)
{
@ -72,55 +115,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
ValidateOptions();
if (_isRunning)
if (_hasStarted)
{
// The server has already started and/or has not been cleaned up yet
throw new InvalidOperationException("Server has already started.");
}
_isRunning = true;
var trace = new KestrelTrace(_logger);
var systemClock = new SystemClock();
var dateHeaderValueManager = new DateHeaderValueManager(systemClock);
var connectionManager = new FrameConnectionManager();
_heartbeat = new Heartbeat(new IHeartbeatHandler[] { dateHeaderValueManager, connectionManager }, systemClock, trace);
IThreadPool threadPool;
if (Options.UseTransportThread)
{
threadPool = new InlineLoggingThreadPool(trace);
}
else
{
threadPool = new LoggingThreadPool(trace);
}
var serviceContext = new ServiceContext
{
Log = trace,
HttpParserFactory = frameParser => new HttpParser<FrameAdapter>(frameParser.Frame.ServiceContext.Log),
ThreadPool = threadPool,
SystemClock = systemClock,
DateHeaderValueManager = dateHeaderValueManager,
ConnectionManager = connectionManager,
ServerOptions = Options
};
_hasStarted = true;
async Task OnBind(ListenOptions endpoint)
{
var connectionHandler = new ConnectionHandler<TContext>(endpoint, serviceContext, application);
var connectionHandler = new ConnectionHandler<TContext>(endpoint, ServiceContext, application);
var transport = _transportFactory.Create(endpoint, connectionHandler);
_transports.Add(transport);
await transport.BindAsync().ConfigureAwait(false);
}
await AddressBinder.BindAsync(_serverAddresses, Options.ListenOptions, _logger, OnBind).ConfigureAwait(false);
await AddressBinder.BindAsync(_serverAddresses, Options.ListenOptions, Trace, OnBind).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogCritical(0, ex, "Unable to start Kestrel.");
Trace.LogCritical(0, ex, "Unable to start Kestrel.");
Dispose();
throw;
}
@ -134,22 +149,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
return;
}
if (_transports != null)
var tasks = new Task[_transports.Count];
for (int i = 0; i < _transports.Count; i++)
{
var tasks = new Task[_transports.Count];
for (int i = 0; i < _transports.Count; i++)
{
tasks[i] = _transports[i].UnbindAsync();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
// TODO: Do transport-agnostic connection management/shutdown.
for (int i = 0; i < _transports.Count; i++)
{
tasks[i] = _transports[i].StopAsync();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
tasks[i] = _transports[i].UnbindAsync();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
if (!await ConnectionManager.CloseAllConnectionsAsync(cancellationToken).ConfigureAwait(false))
{
Trace.NotAllConnectionsClosedGracefully();
if (!await ConnectionManager.AbortAllConnectionsAsync().ConfigureAwait(false))
{
Trace.NotAllConnectionsAborted();
}
}
for (int i = 0; i < _transports.Count; i++)
{
tasks[i] = _transports[i].StopAsync();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
_heartbeat?.Dispose();
}

View File

@ -13,9 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions
IPipeWriter Input { get; }
IPipeReader Output { get; }
// TODO: Remove these (Use Pipes instead?)
// TODO: Remove these (Use Pipes Tasks instead?)
void OnConnectionClosed();
Task StopAsync();
void Abort(Exception ex);
}
}

View File

@ -23,9 +23,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
void ConnectionError(string connectionId, Exception ex);
void ConnectionReset(string connectionId);
void NotAllConnectionsClosedGracefully();
void NotAllConnectionsAborted();
}
}

View File

@ -26,14 +26,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private readonly UvStreamHandle _socket;
private IConnectionContext _connectionContext;
private TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private WritableBuffer? _currentWritableBuffer;
public LibuvConnection(ListenerContext context, UvStreamHandle socket) : base(context)
{
_socket = socket;
socket.Connection = this;
var tcpHandle = _socket as UvTcpHandle;
if (tcpHandle != null)
@ -86,29 +83,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// Ensure the socket is disposed prior to completing in the input writer.
_socket.Dispose();
Input.Complete(new TaskCanceledException("The request was aborted"));
_socketClosedTcs.TrySetResult(null);
_connectionContext.OnConnectionClosed();
}
}
catch (Exception e)
{
Log.LogCritical(0, e, $"{nameof(LibuvConnection)}.{nameof(Start)}() {ConnectionId}");
}
finally
{
_connectionContext.OnConnectionClosed();
}
}
public async Task StopAsync()
{
await _connectionContext.StopAsync();
await _socketClosedTcs.Task;
}
public Task AbortAsync(Exception error)
{
_connectionContext.Abort(error);
return StopAsync();
}
// Called on Libuv thread
@ -201,7 +182,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
if (!normalRead)
{
var ignore = AbortAsync(error);
_connectionContext.Abort(error);
// Complete after aborting the connection
Input.Complete(error);

View File

@ -1,80 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
public class LibuvConnectionManager
{
private readonly LibuvThread _thread;
public LibuvConnectionManager(LibuvThread thread)
{
_thread = thread;
}
public async Task<bool> WalkConnectionsAndCloseAsync(TimeSpan timeout)
{
return await WalkConnectionsAsync((connectionManager, tcs) => connectionManager.WalkConnectionsAndCloseCore(tcs), timeout).ConfigureAwait(false);
}
public async Task<bool> WalkConnectionsAndAbortAsync(TimeSpan timeout)
{
return await WalkConnectionsAsync((connectionManager, tcs) => connectionManager.WalkConnectionsAndAbortCore(tcs), timeout).ConfigureAwait(false);
}
private async Task<bool> WalkConnectionsAsync(Action<LibuvConnectionManager, TaskCompletionSource<object>> action, TimeSpan timeout)
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_thread.Post(state => action(state, tcs), this);
return await Task.WhenAny(tcs.Task, Task.Delay(timeout)).ConfigureAwait(false) == tcs.Task;
}
private void WalkConnectionsAndCloseCore(TaskCompletionSource<object> tcs)
{
WalkConnectionsCore(connection => connection.StopAsync(), tcs);
}
private void WalkConnectionsAndAbortCore(TaskCompletionSource<object> tcs)
{
WalkConnectionsCore(connection => connection.AbortAsync(error: null), tcs);
}
private void WalkConnectionsCore(Func<LibuvConnection, Task> action, TaskCompletionSource<object> tcs)
{
var tasks = new List<Task>();
_thread.Walk(ptr =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
var connection = (handle as UvStreamHandle)?.Connection;
if (connection != null)
{
tasks.Add(action(connection));
}
});
Task.Run(() =>
{
try
{
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{
tcs.SetException(ex);
return;
}
tcs.SetResult(null);
});
}
}
}

View File

@ -38,14 +38,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private bool _initCompleted = false;
private ExceptionDispatchInfo _closeError;
private readonly ILibuvTrace _log;
private readonly TimeSpan _shutdownTimeout;
public LibuvThread(LibuvTransport transport)
{
_transport = transport;
_appLifetime = transport.AppLifetime;
_log = transport.Log;
_shutdownTimeout = transport.TransportOptions.ShutdownTimeout;
_loop = new UvLoopHandle(_log);
_post = new UvAsyncHandle(_log);
_thread = new Thread(ThreadStart);
@ -59,7 +57,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
QueueCloseAsyncHandle = EnqueueCloseHandle;
PipeFactory = new PipeFactory();
WriteReqPool = new WriteReqPool(this, _log);
ConnectionManager = new LibuvConnectionManager(this);
}
// For testing
@ -73,8 +70,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
public PipeFactory PipeFactory { get; }
public LibuvConnectionManager ConnectionManager { get; }
public WriteReqPool WriteReqPool { get; }
#if DEBUG
@ -87,9 +82,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private Action<Action<IntPtr>, IntPtr> QueueCloseAsyncHandle { get; }
// The cached result of Loop.Now() which is a timestamp in milliseconds
private long Now { get; set; }
public Task StartAsync()
{
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -109,10 +101,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
if (!_threadTcs.Task.IsCompleted)
{
// These operations need to run on the libuv thread so it only makes
// sense to attempt execution if it's still running
await DisposeConnectionsAsync().ConfigureAwait(false);
var stepTimeout = TimeSpan.FromTicks(timeout.Ticks / 3);
Post(t => t.AllowStop());
@ -161,20 +149,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
#endif
private async Task DisposeConnectionsAsync()
{
// Close and wait for all connections
if (!await ConnectionManager.WalkConnectionsAndCloseAsync(_shutdownTimeout).ConfigureAwait(false))
{
_log.NotAllConnectionsClosedGracefully();
if (!await ConnectionManager.WalkConnectionsAndAbortAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false))
{
_log.NotAllConnectionsAborted();
}
}
}
private void AllowStop()
{
_post.Unreference();
@ -285,9 +259,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
// This is used to access a 64-bit timestamp (this.Now) using a potentially 32-bit IntPtr.
var thisHandle = GCHandle.Alloc(this, GCHandleType.Weak);
try
{
_loop.Run();
@ -317,7 +288,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
PipeFactory.Dispose();
WriteReqPool.Dispose();
thisHandle.Free();
_threadTcs.SetResult(null);
#if DEBUG
@ -434,6 +404,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
public static readonly Action<object, object> PostCallbackAdapter = (callback, state) => ((Action<T>)callback).Invoke((T)state);
public static readonly Action<object, object> PostAsyncCallbackAdapter = (callback, state) => ((Action<T>)callback).Invoke((T)state);
}
}
}

View File

@ -26,15 +26,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private static readonly Action<ILogger, string, Exception> _connectionError =
LoggerMessage.Define<string>(LogLevel.Information, 14, @"Connection id ""{ConnectionId}"" communication error.");
private static readonly Action<ILogger, Exception> _notAllConnectionsClosedGracefully =
LoggerMessage.Define(LogLevel.Debug, 16, "Some connections failed to close gracefully during server shutdown.");
private static readonly Action<ILogger, string, Exception> _connectionReset =
LoggerMessage.Define<string>(LogLevel.Debug, 19, @"Connection id ""{ConnectionId}"" reset.");
private static readonly Action<ILogger, Exception> _notAllConnectionsAborted =
LoggerMessage.Define(LogLevel.Debug, 21, "Some connections failed to abort during server shutdown.");
private readonly ILogger _logger;
public LibuvTrace(ILogger logger)
@ -85,16 +79,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
_connectionReset(_logger, connectionId, null);
}
public void NotAllConnectionsClosedGracefully()
{
_notAllConnectionsClosedGracefully(_logger, null);
}
public void NotAllConnectionsAborted()
{
_notAllConnectionsAborted(_logger, null);
}
public IDisposable BeginScope<TState>(TState state) => _logger.BeginScope(state);
public bool IsEnabled(LogLevel logLevel) => _logger.IsEnabled(logLevel);

View File

@ -27,8 +27,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin
{
}
public LibuvConnection Connection { get; set; }
protected override bool ReleaseHandle()
{
if (_listenVitality.IsAllocated)

View File

@ -18,17 +18,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv
/// </remarks>
public int ThreadCount { get; set; } = ProcessorThreadCount;
// TODO: Move all shutdown timeout logic back into core project.
/// <summary>
/// The amount of time after the server begins shutting down before connections will be forcefully closed.
/// Kestrel will wait for the duration of the timeout for any ongoing request processing to complete before
/// terminating the connection. No new connections or requests will be accepted during this time.
/// </summary>
/// <remarks>
/// Defaults to 5 seconds.
/// </remarks>
public TimeSpan ShutdownTimeout { get; set; } = TimeSpan.FromSeconds(5);
private static int ProcessorThreadCount
{
get

View File

@ -0,0 +1,61 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Testing;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class FrameConnectionManagerTests
{
[Fact]
public void UnrootedConnectionsGetRemovedFromHeartbeat()
{
var connectionId = "0";
var trace = new Mock<IKestrelTrace>();
var frameConnectionManager = new FrameConnectionManager(trace.Object);
// Create FrameConnection in inner scope so it doesn't get rooted by the current frame.
UnrootedConnectionsGetRemovedFromHeartbeatInnerScope(connectionId, frameConnectionManager, trace);
GC.Collect();
GC.WaitForPendingFinalizers();
var connectionCount = 0;
frameConnectionManager.Walk(_ => connectionCount++);
Assert.Equal(0, connectionCount);
trace.Verify(t => t.ApplicationNeverCompleted(connectionId), Times.Once());
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void UnrootedConnectionsGetRemovedFromHeartbeatInnerScope(
string connectionId,
FrameConnectionManager frameConnectionManager,
Mock<IKestrelTrace> trace)
{
var serviceContext = new TestServiceContext
{
ConnectionManager = frameConnectionManager
};
// The FrameConnection constructor adds itself to the connection manager.
var ignore = new FrameConnection(new FrameConnectionContext
{
ServiceContext = serviceContext,
ConnectionId = connectionId
});
var connectionCount = 0;
frameConnectionManager.Walk(_ => connectionCount++);
Assert.Equal(1, connectionCount);
trace.Verify(t => t.ApplicationNeverCompleted(connectionId), Times.Never());
}
}
}

View File

@ -446,12 +446,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var connectionControl = new Mock<ITimeoutControl>();
_frame.TimeoutControl = connectionControl.Object;
var requestProcessingTask = _frame.RequestProcessingAsync();
_frame.Start();
var expectedKeepAliveTimeout = _serviceContext.ServerOptions.Limits.KeepAliveTimeout.Ticks;
connectionControl.Verify(cc => cc.SetTimeout(expectedKeepAliveTimeout, TimeoutAction.CloseConnection));
_frame.StopAsync();
var requestProcessingTask = _frame.StopAsync();
_input.Writer.Complete();
requestProcessingTask.Wait();

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public void ExceptionFromHeartbeatHandlerIsLoggedAsError()
{
var systemClock = new MockSystemClock();
var heartbeatInterval = TimeSpan.FromSeconds(1);
var heartbeatInterval = TimeSpan.FromMilliseconds(10);
var heartbeatHandler = new Mock<IHeartbeatHandler>();
var kestrelTrace = new TestKestrelTrace();
var ex = new Exception();
@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.True(kestrelTrace.Logger.MessageLoggedTask.Wait(TimeSpan.FromSeconds(10)));
}
Assert.Equal(ex, kestrelTrace.Logger.Messages.Single(message => message.LogLevel == LogLevel.Error).Exception);
Assert.Equal(ex, kestrelTrace.Logger.Messages.First(message => message.LogLevel == LogLevel.Error).Exception);
}
}
}

View File

@ -0,0 +1,102 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Testing;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
public class FrameConnectionManagerTests
{
private const int _applicationNeverCompletedId = 23;
[ConditionalFact]
[NoDebuggerCondition]
public async Task CriticalErrorLoggedIfApplicationDoesntComplete()
{
////////////////////////////////////////////////////////////////////////////////////////
// WARNING: This test will fail under a debugger because Task.s_currentActiveTasks //
// roots FrameConnection. //
////////////////////////////////////////////////////////////////////////////////////////
var logWh = new SemaphoreSlim(0);
var appStartedWh = new SemaphoreSlim(0);
var mockLogger = new Mock<ILogger>();
mockLogger
.Setup(logger => logger.IsEnabled(It.IsAny<LogLevel>()))
.Returns(true);
mockLogger
.Setup(logger => logger.Log(LogLevel.Critical, _applicationNeverCompletedId, It.IsAny<object>(), null,
It.IsAny<Func<object, Exception, string>>()))
.Callback(() =>
{
logWh.Release();
});
var mockLoggerFactory = new Mock<ILoggerFactory>();
mockLoggerFactory
.Setup(factory => factory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel"))
.Returns(mockLogger.Object);
mockLoggerFactory
.Setup(factory => factory.CreateLogger(It.IsNotIn("Microsoft.AspNetCore.Server.Kestrel")))
.Returns(Mock.Of<ILogger>());
var builder = new WebHostBuilder()
.UseLoggerFactory(mockLoggerFactory.Object)
.UseKestrel()
.UseUrls("http://127.0.0.1:0")
.Configure(app =>
{
app.Run(context =>
{
appStartedWh.Release();
var tcs = new TaskCompletionSource<object>();
return tcs.Task;
});
});
using (var host = builder.Build())
{
host.Start();
using (var connection = new TestConnection(host.GetPort()))
{
await connection.Send("GET / HTTP/1.1",
"Host:",
"",
"");
Assert.True(await appStartedWh.WaitAsync(TimeSpan.FromSeconds(10)));
// Close connection without waiting for a response
}
var logWaitAttempts = 0;
for (; !await logWh.WaitAsync(TimeSpan.FromSeconds(1)) && logWaitAttempts < 10; logWaitAttempts++)
{
GC.Collect();
GC.WaitForPendingFinalizers();
}
Assert.True(logWaitAttempts < 10);
}
}
private class NoDebuggerConditionAttribute : Attribute, ITestCondition
{
public bool IsMet => !Debugger.IsAttached;
public string SkipReason => "A debugger is attached.";
}
}
}

View File

@ -7,8 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.TestHelpers;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Testing;
using Xunit;
@ -51,7 +50,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
});
}
private async Task ConnectionClosedWhenKeepAliveTimeoutExpires(TimeoutTestServer server)
private async Task ConnectionClosedWhenKeepAliveTimeoutExpires(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -65,7 +64,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionKeptAliveBetweenRequests(TimeoutTestServer server)
private async Task ConnectionKeptAliveBetweenRequests(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -86,7 +85,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionNotTimedOutWhileRequestBeingSent(TimeoutTestServer server)
private async Task ConnectionNotTimedOutWhileRequestBeingSent(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -117,7 +116,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionNotTimedOutWhileAppIsRunning(TimeoutTestServer server, CancellationTokenSource cts)
private async Task ConnectionNotTimedOutWhileAppIsRunning(TestServer server, CancellationTokenSource cts)
{
using (var connection = server.CreateConnection())
{
@ -144,7 +143,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionTimesOutWhenOpenedButNoRequestSent(TimeoutTestServer server)
private async Task ConnectionTimesOutWhenOpenedButNoRequestSent(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -153,7 +152,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task KeepAliveTimeoutDoesNotApplyToUpgradedConnections(TimeoutTestServer server, CancellationTokenSource cts)
private async Task KeepAliveTimeoutDoesNotApplyToUpgradedConnections(TestServer server, CancellationTokenSource cts)
{
using (var connection = server.CreateConnection())
{
@ -181,12 +180,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private TimeoutTestServer CreateServer(CancellationToken longRunningCt, CancellationToken upgradeCt)
private TestServer CreateServer(CancellationToken longRunningCt, CancellationToken upgradeCt)
{
return new TimeoutTestServer(httpContext => App(httpContext, longRunningCt, upgradeCt), new KestrelServerOptions
return new TestServer(httpContext => App(httpContext, longRunningCt, upgradeCt), new TestServiceContext
{
AddServerHeader = false,
Limits = { KeepAliveTimeout = KeepAliveTimeout }
// Use real SystemClock so timeouts trigger.
SystemClock = new SystemClock(),
ServerOptions =
{
AddServerHeader = false,
Limits = { KeepAliveTimeout = KeepAliveTimeout }
}
});
}

View File

@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
public class LoggingConnectionAdapterTests
{
[Fact]
public async Task LoggingConnectionFilterCanBeAddedBeforeAndAfterHttpsFilter()
public async Task LoggingConnectionAdapterCanBeAddedBeforeAndAfterHttpsAdapter()
{
var host = new WebHostBuilder()
.UseKestrel(options =>
@ -24,6 +24,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
listenOptions.UseConnectionLogging();
listenOptions.UseHttps(TestResources.TestCertificatePath, "testPassword");
listenOptions.UseConnectionLogging();
});
})
.Configure(app =>

View File

@ -287,7 +287,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
bytesRead += await context.Request.Body.ReadAsync(buffer, bytesRead, buffer.Length - bytesRead);
}
await clientFinishedSendingRequestBody.Task.TimeoutAfter(TimeSpan.FromSeconds(30));
await clientFinishedSendingRequestBody.Task.TimeoutAfter(TimeSpan.FromSeconds(120));
// Verify client didn't send extra bytes
if (context.Request.Body.ReadByte() != -1)
@ -323,7 +323,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
// Timeouts large enough to prevent false positives, but small enough to fail quickly.
socket.SendTimeout = 10 * 1000;
socket.ReceiveTimeout = 30 * 1000;
socket.ReceiveTimeout = 120 * 1000;
socket.Connect(IPAddress.Loopback, port);

View File

@ -5,8 +5,7 @@ using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.TestHelpers;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Testing;
using Xunit;
@ -38,7 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionAbortedWhenRequestHeadersNotReceivedInTime(TimeoutTestServer server, string headers)
private async Task ConnectionAbortedWhenRequestHeadersNotReceivedInTime(TestServer server, string headers)
{
using (var connection = server.CreateConnection())
{
@ -49,7 +48,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task RequestHeadersTimeoutCanceledAfterHeadersReceived(TimeoutTestServer server)
private async Task RequestHeadersTimeoutCanceledAfterHeadersReceived(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -66,7 +65,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task ConnectionAbortedWhenRequestLineNotReceivedInTime(TimeoutTestServer server, string requestLine)
private async Task ConnectionAbortedWhenRequestLineNotReceivedInTime(TestServer server, string requestLine)
{
using (var connection = server.CreateConnection())
{
@ -75,7 +74,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private async Task TimeoutNotResetOnEachRequestLineCharacterReceived(TimeoutTestServer server)
private async Task TimeoutNotResetOnEachRequestLineCharacterReceived(TestServer server)
{
using (var connection = server.CreateConnection())
{
@ -90,21 +89,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
private TimeoutTestServer CreateServer()
private TestServer CreateServer()
{
return new TimeoutTestServer(async httpContext =>
{
await httpContext.Request.Body.ReadAsync(new byte[1], 0, 1);
await httpContext.Response.WriteAsync("hello, world");
},
new KestrelServerOptions
return new TestServer(async httpContext =>
{
await httpContext.Request.Body.ReadAsync(new byte[1], 0, 1);
await httpContext.Response.WriteAsync("hello, world");
},
new TestServiceContext
{
// Use real SystemClock so timeouts trigger.
SystemClock = new SystemClock(),
ServerOptions =
{
AddServerHeader = false,
Limits =
{
RequestHeadersTimeout = RequestHeadersTimeout
}
});
Limits = { RequestHeadersTimeout = RequestHeadersTimeout }
}
});
}
private async Task ReceiveResponse(TestConnection connection)

View File

@ -1,50 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Threading;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.TestHelpers
{
public class TimeoutTestServer : IDisposable
{
private readonly KestrelServer _server;
private readonly ListenOptions _listenOptions;
public TimeoutTestServer(RequestDelegate app, KestrelServerOptions serverOptions)
{
var loggerFactory = new KestrelTestLoggerFactory(new TestApplicationErrorLogger());
var libuvTransportFactory = new LibuvTransportFactory(Options.Create(new LibuvTransportOptions()), new LifetimeNotImplemented(), loggerFactory);
_listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
serverOptions.ListenOptions.Add(_listenOptions);
_server = new KestrelServer(Options.Create(serverOptions), libuvTransportFactory, loggerFactory);
try
{
_server.StartAsync(new DummyApplication(app), CancellationToken.None).GetAwaiter().GetResult();
}
catch
{
_server.Dispose();
throw;
}
}
public TestConnection CreateConnection()
{
return new TestConnection(_listenOptions.IPEndPoint.Port, _listenOptions.IPEndPoint.AddressFamily);
}
public void Dispose()
{
_server?.Dispose();
}
}
}

View File

@ -4,9 +4,9 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
@ -20,7 +20,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
/// </summary>
public class TestServer : IDisposable
{
private ITransport _transport;
private static TimeSpan _shutdownTimeout = TimeSpan.FromSeconds(5);
private KestrelServer _server;
private ListenOptions _listenOptions;
public TestServer(RequestDelegate app)
@ -48,36 +50,30 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
_listenOptions = listenOptions;
Context = context;
context.ServerOptions.ListenOptions.Add(_listenOptions);
// Switch this to test on socket transport
var transportFactory = CreateLibuvTransportFactory(context);
// var transportFactory = CreateSocketTransportFactory(context);
_transport = transportFactory.Create(listenOptions, new ConnectionHandler<HttpContext>(listenOptions, context, new DummyApplication(app, httpContextFactory)));
_server = new KestrelServer(transportFactory, context);
var httpApplication = new DummyApplication(app, httpContextFactory);
try
{
_transport.BindAsync().Wait();
_server.StartAsync(httpApplication, CancellationToken.None).Wait();
}
catch
{
if (_transport != null)
{
_transport.UnbindAsync().Wait();
_transport.StopAsync().Wait();
_transport = null;
}
_server.StopAsync(new CancellationTokenSource(_shutdownTimeout).Token).Wait();
_server.Dispose();
throw;
}
}
private static ITransportFactory CreateLibuvTransportFactory(TestServiceContext context)
{
var transportOptions = new LibuvTransportOptions()
{
ThreadCount = 1,
ShutdownTimeout = TimeSpan.FromSeconds(5)
};
var transportOptions = new LibuvTransportOptions { ThreadCount = 1 };
var transportFactory = new LibuvTransportFactory(
Options.Create(transportOptions),
@ -90,7 +86,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
private static ITransportFactory CreateSocketTransportFactory(TestServiceContext context)
{
var options = new SocketTransportOptions();
return new SocketTransportFactory(Options.Create(options));
}
@ -107,8 +102,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
public void Dispose()
{
_transport.UnbindAsync().Wait();
_transport.StopAsync().Wait();
_server.StopAsync(new CancellationTokenSource(_shutdownTimeout).Token).Wait();
_server.Dispose();
}
}
}

View File

@ -34,6 +34,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
public void NotAllConnectionsAborted() { }
public void NotAllConnectionsClosedGracefully() { }
public void RequestProcessingError(string connectionId, Exception ex) { }
public virtual void TimerSlow(TimeSpan interval, DateTimeOffset now) { }
public void TimerSlow(TimeSpan interval, DateTimeOffset now) { }
public void ApplicationNeverCompleted(string connectionId) { }
}
}

View File

@ -16,11 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
AppLifetime = new LifetimeNotImplemented();
ConnectionHandler = new MockConnectionHandler();
Log = new LibuvTrace(logger);
Options = new LibuvTransportOptions
{
ThreadCount = 1,
ShutdownTimeout = TimeSpan.FromSeconds(5)
};
Options = new LibuvTransportOptions { ThreadCount = 1 };
}
}
}

View File

@ -18,7 +18,7 @@ namespace Microsoft.AspNetCore.Testing
ThreadPool = new LoggingThreadPool(Log);
SystemClock = new MockSystemClock();
DateHeaderValueManager = new DateHeaderValueManager(SystemClock);
ConnectionManager = new FrameConnectionManager();
ConnectionManager = new FrameConnectionManager(Log);
DateHeaderValue = DateHeaderValueManager.GetDateHeaderValues().String;
HttpParserFactory = frameAdapter => new HttpParser<FrameAdapter>(frameAdapter.Frame.ServiceContext.Log);
ServerOptions = new KestrelServerOptions