Allocate things per KestrelThread instead of per listener

- Moved things that have loop affinity to KestrelThread like
WriteReqPool, MemoryPool and the ConnectionManager
- Changed on the listeners to only kill the ListenSocket, not the
connections on dispose
- Moved connection disposal to KestrelThread.Stop
- Simplify the connection manager logic so it's possible to walk and
wait in a single call
This commit is contained in:
David Fowler 2016-07-15 00:48:01 -07:00
parent 24fa5c0264
commit 0e7967a7fc
10 changed files with 161 additions and 117 deletions

View File

@ -52,8 +52,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_bufferSizeControl = new BufferSizeControl(ServerOptions.MaxRequestBufferSize.Value, this, Thread);
}
SocketInput = new SocketInput(Memory, ThreadPool, _bufferSizeControl);
SocketOutput = new SocketOutput(Thread, _socket, Memory, this, ConnectionId, Log, ThreadPool, WriteReqPool);
SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
var tcpHandle = _socket as UvTcpHandle;
if (tcpHandle != null)
@ -166,7 +166,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
if (_filterContext.Connection != _libuvStream)
{
_filteredStreamAdapter = new FilteredStreamAdapter(ConnectionId, _filterContext.Connection, Memory, Log, ThreadPool, _bufferSizeControl);
_filteredStreamAdapter = new FilteredStreamAdapter(ConnectionId, _filterContext.Connection, Thread.Memory, Log, ThreadPool, _bufferSizeControl);
_frame.SocketInput = _filteredStreamAdapter.SocketInput;
_frame.SocketOutput = _filteredStreamAdapter.SocketOutput;

View File

@ -3,30 +3,36 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
public class ConnectionManager
{
private KestrelThread _thread;
private List<Task> _connectionStopTasks;
private readonly KestrelThread _thread;
private readonly IThreadPool _threadPool;
public ConnectionManager(KestrelThread thread)
public ConnectionManager(KestrelThread thread, IThreadPool threadPool)
{
_thread = thread;
_threadPool = threadPool;
}
// This must be called on the libuv event loop
public void WalkConnectionsAndClose()
public bool WalkConnectionsAndClose(TimeSpan timeout)
{
if (_connectionStopTasks != null)
{
throw new InvalidOperationException($"{nameof(WalkConnectionsAndClose)} cannot be called twice.");
}
var wh = new ManualResetEventSlim();
_connectionStopTasks = new List<Task>();
_thread.Post(state => ((ConnectionManager)state).WalkConnectionsAndCloseCore(wh), this);
return wh.Wait(timeout);
}
private void WalkConnectionsAndCloseCore(ManualResetEventSlim wh)
{
var connectionStopTasks = new List<Task>();
_thread.Walk(ptr =>
{
@ -35,19 +41,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
if (connection != null)
{
_connectionStopTasks.Add(connection.StopAsync());
connectionStopTasks.Add(connection.StopAsync());
}
});
}
public Task WaitForConnectionCloseAsync()
{
if (_connectionStopTasks == null)
_threadPool.Run(() =>
{
throw new InvalidOperationException($"{nameof(WalkConnectionsAndClose)} must be called first.");
}
return Task.WhenAll(_connectionStopTasks);
Task.WaitAll(connectionStopTasks.ToArray());
wh.Set();
});
}
}
}

View File

@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
private bool _closed;
protected Listener(ServiceContext serviceContext)
protected Listener(ServiceContext serviceContext)
: base(serviceContext)
{
}
@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
ServerAddress = address;
Thread = thread;
ConnectionManager = new ConnectionManager(thread);
var tcs = new TaskCompletionSource<int>(this);
@ -57,7 +56,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
protected static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
var listener = (Listener) state;
var listener = (Listener)state;
if (error != null)
{
@ -97,22 +96,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
listener._closed = true;
listener.ConnectionManager.WalkConnectionsAndClose();
}, this).ConfigureAwait(false);
await ConnectionManager.WaitForConnectionCloseAsync().ConfigureAwait(false);
await Thread.PostAsync(state =>
{
var writeReqPool = ((Listener)state).WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
}, this).ConfigureAwait(false);
}
Memory.Dispose();
ListenSocket = null;
}
}

View File

@ -16,8 +16,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
public ListenerContext(ServiceContext serviceContext)
: base(serviceContext)
{
Memory = new MemoryPool();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
}
public ListenerContext(ListenerContext listenerContext)
@ -25,20 +23,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Memory = listenerContext.Memory;
ConnectionManager = listenerContext.ConnectionManager;
WriteReqPool = listenerContext.WriteReqPool;
Log = listenerContext.Log;
}
public ServerAddress ServerAddress { get; set; }
public KestrelThread Thread { get; set; }
public MemoryPool Memory { get; set; }
public ConnectionManager ConnectionManager { get; set; }
public Queue<UvWriteReq> WriteReqPool { get; set; }
}
}

View File

@ -39,8 +39,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
ServerAddress = address;
Thread = thread;
ConnectionManager = new ConnectionManager(thread);
DispatchPipe = new UvPipeHandle(Log);
var tcs = new TaskCompletionSource<int>(this);
@ -180,27 +178,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
listener._closed = true;
listener.ConnectionManager.WalkConnectionsAndClose();
}, this).ConfigureAwait(false);
await ConnectionManager.WaitForConnectionCloseAsync().ConfigureAwait(false);
await Thread.PostAsync(state =>
{
var listener = (ListenerSecondary)state;
var writeReqPool = listener.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
}, this).ConfigureAwait(false);
}
else
{
FreeBuffer();
}
Memory.Dispose();
}
}
}

View File

@ -14,8 +14,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
public class SocketOutput : ISocketOutput
{
public const int MaxPooledWriteReqs = 1024;
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
// Well behaved WriteAsync users should await returned task, so there is no need to allocate more per connection by default
@ -58,17 +56,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private WriteContext _nextWriteContext;
private readonly Queue<WaitingTask> _tasksPending;
private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool;
private readonly WriteReqPool _writeReqPool;
public SocketOutput(
KestrelThread thread,
UvStreamHandle socket,
MemoryPool memory,
Connection connection,
string connectionId,
IKestrelTrace log,
IThreadPool threadPool,
Queue<UvWriteReq> writeReqPool)
IThreadPool threadPool)
{
_thread = thread;
_socket = socket;
@ -78,9 +74,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_threadPool = threadPool;
_tasksPending = new Queue<WaitingTask>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool;
_writeReqPool = thread.WriteReqPool;
_head = memory.Lease();
_head = thread.Memory.Lease();
_tail = _head;
}
@ -585,15 +581,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
if (Self._writeReqPool.Count > 0)
{
_writeReq = Self._writeReqPool.Dequeue();
}
else
{
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}
_writeReq = Self._writeReqPool.Allocate();
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
@ -691,14 +679,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private void PoolWriteReq(UvWriteReq writeReq)
{
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
{
Self._writeReqPool.Enqueue(writeReq);
}
else
{
writeReq.Dispose();
}
Self._writeReqPool.Return(writeReq);
}
private void ScheduleReturnFullyWrittenBlocks()

View File

@ -7,6 +7,7 @@ using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
using Microsoft.Extensions.Logging;
@ -42,6 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
private ExceptionDispatchInfo _closeError;
private readonly IKestrelTrace _log;
private readonly IThreadPool _threadPool;
private readonly TimeSpan _shutdownTimeout;
public KestrelThread(KestrelEngine engine)
{
@ -49,6 +51,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
_appLifetime = engine.AppLifetime;
_log = engine.Log;
_threadPool = engine.ThreadPool;
_shutdownTimeout = engine.ServerOptions.ShutdownTimeout;
_loop = new UvLoopHandle(_log);
_post = new UvAsyncHandle(_log);
_thread = new Thread(ThreadStart);
@ -60,10 +63,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
#endif
QueueCloseHandle = PostCloseHandle;
QueueCloseAsyncHandle = EnqueueCloseHandle;
Memory = new MemoryPool();
WriteReqPool = new WriteReqPool(this, _log);
ConnectionManager = new ConnectionManager(this, _threadPool);
}
public UvLoopHandle Loop { get { return _loop; } }
public MemoryPool Memory { get; }
public ConnectionManager ConnectionManager { get; }
public WriteReqPool WriteReqPool { get; }
public ExceptionDispatchInfo FatalError { get { return _closeError; } }
public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; }
@ -95,6 +107,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
if (_thread.IsAlive)
{
// These operations need to run on the libuv thread so it only makes
// sense to attempt execution if it's still running
DisposeConnections();
var stepTimeout = (int)(timeout.TotalMilliseconds / 2);
try
{
@ -125,6 +141,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal
}
}
private void DisposeConnections()
{
try
{
// Close and wait for all connections
if (!ConnectionManager.WalkConnectionsAndClose(_shutdownTimeout))
{
_log.LogError(0, null, "Waiting for connections timed out");
}
var result = PostAsync(state =>
{
var listener = (KestrelThread)state;
listener.WriteReqPool.Dispose();
},
this).Wait(_shutdownTimeout);
if (!result)
{
_log.LogError(0, null, "Disposing write requests failed");
}
}
finally
{
Memory.Dispose();
}
}
private void OnStopRude()
{
Walk(ptr =>

View File

@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure
{
public class WriteReqPool
{
private const int _maxPooledWriteReqs = 1024;
private readonly KestrelThread _thread;
private readonly Queue<UvWriteReq> _pool = new Queue<UvWriteReq>(_maxPooledWriteReqs);
private readonly IKestrelTrace _log;
private bool _disposed;
public WriteReqPool(KestrelThread thread, IKestrelTrace log)
{
_thread = thread;
_log = log;
}
public UvWriteReq Allocate()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
UvWriteReq req;
if (_pool.Count > 0)
{
req = _pool.Dequeue();
}
else
{
req = new UvWriteReq(_log);
req.Init(_thread.Loop);
}
return req;
}
public void Return(UvWriteReq req)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
if (_pool.Count < _maxPooledWriteReqs)
{
_pool.Enqueue(req);
}
else
{
req.Dispose();
}
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
while (_pool.Count > 0)
{
_pool.Dequeue().Dispose();
}
}
}
}
}

View File

@ -17,7 +17,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
var mockLibuv = new MockLibuv();
using (var memory = new MemoryPool())
using (var engine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
engine.Start(count: 1);
@ -27,7 +26,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
FrameFactory = connectionContext => new Frame<HttpContext>(
new DummyApplication(httpContext => TaskUtilities.CompletedTask), connectionContext),
Memory = memory,
ServerAddress = ServerAddress.FromUrl("http://127.0.0.1:0"),
Thread = engine.Threads[0]
};

View File

@ -26,8 +26,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
// Arrange
var mockLibuv = new MockLibuv();
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -36,7 +34,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
var bufferSize = 1048576;
@ -78,7 +76,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -88,7 +85,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -152,7 +149,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -162,7 +158,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted / 2;
var data = new byte[bufferSize];
@ -231,7 +227,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -240,10 +235,10 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
using (var mockConnection = new MockConnection())
{
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
@ -348,7 +343,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -361,7 +355,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
using (var mockConnection = new MockConnection())
{
var abortedSource = mockConnection.RequestAbortedSource;
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
@ -441,7 +435,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -451,7 +444,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -534,7 +527,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -543,14 +535,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
// block 1
var start = socketOutput.ProducingStart();
start.Block.End = start.Block.Data.Offset + start.Block.Data.Count;
// block 2
var block2 = memory.Lease();
var block2 = kestrelThread.Memory.Lease();
block2.End = block2.Data.Offset + block2.Data.Count;
start.Block.Next = block2;
@ -586,7 +578,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -596,7 +587,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var buffer = new ArraySegment<byte>(new byte[1]);
@ -656,7 +647,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -665,7 +655,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
var blockThreadWh = new ManualResetEventSlim();
kestrelThread.Post(_ =>
@ -702,7 +692,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
var mockLibuv = new MockLibuv();
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
@ -712,7 +701,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var connection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, connection, "0", trace, ltp, new Queue<UvWriteReq>());
var socketOutput = new SocketOutput(kestrelThread, socket, connection, "0", trace, ltp);
// Close SocketOutput
var cleanupTask = socketOutput.WriteAsync(