diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index b49ffd4316..89b9db55e8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); _rawSocketInput = new SocketInput(Memory2, ThreadPool); - _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); } public void Start() diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index 6721756266..5d8c4e8d0d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -94,6 +94,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { var socket = (Listener)tcs2.Task.AsyncState; socket.ListenSocket.Dispose(); + + var writeReqPool = socket.WriteReqPool; + while (writeReqPool.Count > 0) + { + writeReqPool.Dequeue().Dispose(); + } + tcs2.SetResult(0); } catch (Exception ex) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs index fecd0afa44..7247afa2bd 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs @@ -1,8 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using Microsoft.AspNet.Http; +using System.Collections.Generic; using Microsoft.AspNet.Server.Kestrel.Infrastructure; +using Microsoft.AspNet.Server.Kestrel.Networking; namespace Microsoft.AspNet.Server.Kestrel.Http { @@ -17,6 +18,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http : base(serviceContext) { Memory2 = new MemoryPool2(); + WriteReqPool = new Queue(SocketOutput.MaxPooledWriteReqs); } public ListenerContext(ListenerContext listenerContext) @@ -25,6 +27,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Memory2 = listenerContext.Memory2; + WriteReqPool = listenerContext.WriteReqPool; Log = listenerContext.Log; } @@ -33,5 +36,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public KestrelThread Thread { get; set; } public MemoryPool2 Memory2 { get; set; } + + public Queue WriteReqPool { get; set; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 13c690d246..9a0f9f8a14 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -14,9 +14,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { + public const int MaxPooledWriteReqs = 1024; + private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; + private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -42,12 +45,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // The number of write operations that have been scheduled so far // but have not completed. private int _writesPending = 0; - private int _numBytesPreCompleted = 0; private Exception _lastWriteError; private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; + private readonly Queue _writeContextPool; + private readonly Queue _writeReqPool; public SocketOutput( KestrelThread thread, @@ -56,7 +60,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http Connection connection, long connectionId, IKestrelTrace log, - IThreadPool threadPool) + IThreadPool threadPool, + Queue writeReqPool) { _thread = thread; _socket = socket; @@ -66,6 +71,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); + _writeContextPool = new Queue(_maxPooledWriteContexts); + _writeReqPool = writeReqPool; _head = memory.Lease(); _tail = _head; @@ -92,7 +99,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - _nextWriteContext = new WriteContext(this); + if (_writeContextPool.Count > 0) + { + _nextWriteContext = _writeContextPool.Dequeue(); + } + else + { + _nextWriteContext = new WriteContext(this); + } } if (socketShutdownSend) @@ -274,9 +288,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } // This is called on the libuv event loop - private void OnWriteCompleted(int bytesWritten, int status, Exception error) + private void OnWriteCompleted(WriteContext writeContext) { - _log.ConnectionWriteCallback(_connectionId, status); + var bytesWritten = writeContext.ByteCount; + var status = writeContext.WriteStatus; + var error = writeContext.WriteError; + if (error != null) { @@ -290,6 +307,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { + PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -332,11 +350,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + _log.ConnectionWriteCallback(_connectionId, status); + if (scheduleWrite) { - // ScheduleWrite(); - // on right thread, fairness issues? - WriteAllPending(); + ScheduleWrite(); } _tasksCompleted.Clear(); @@ -367,6 +385,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + private void PoolWriteContext(WriteContext writeContext) + { + // called inside _contextLock + if (_writeContextPool.Count < _maxPooledWriteContexts) + { + writeContext.Reset(); + _writeContextPool.Enqueue(writeContext); + } + } + void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -412,19 +440,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); + private SocketOutput Self; + private UvWriteReq _writeReq; private MemoryPoolIterator2 _lockedStart; private MemoryPoolIterator2 _lockedEnd; private int _bufferCount; - private int _byteCount; - - public SocketOutput Self; + public int ByteCount; public bool SocketShutdownSend; public bool SocketDisconnect; public int WriteStatus; public Exception WriteError; - public int ShutdownSendStatus; public WriteContext(SocketOutput self) @@ -439,27 +466,40 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { LockWrite(); - if (_byteCount == 0 || Self._socket.IsClosed) + if (ByteCount == 0 || Self._socket.IsClosed) { DoShutdownIfNeeded(); return; } - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); + // Sample values locally in case write completes inline + // to allow block to be Reset and still complete this function + var lockedEndBlock = _lockedEnd.Block; + var lockedEndIndex = _lockedEnd.Index; - writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => + if (Self._writeReqPool.Count > 0) { - _writeReq.Dispose(); - var _this = (WriteContext)state; - _this.ScheduleReturnFullyWrittenBlocks(); - _this.WriteStatus = status; - _this.WriteError = error; - _this.DoShutdownIfNeeded(); + _writeReq = Self._writeReqPool.Dequeue(); + } + else + { + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); + } + + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => + { + var writeContext = (WriteContext)state; + writeContext.PoolWriteReq(writeContext._writeReq); + writeContext._writeReq = null; + writeContext.ScheduleReturnFullyWrittenBlocks(); + writeContext.WriteStatus = status; + writeContext.WriteError = error; + writeContext.DoShutdownIfNeeded(); }, this); - Self._head = _lockedEnd.Block; - Self._head.Start = _lockedEnd.Index; + Self._head = lockedEndBlock; + Self._head.Start = lockedEndIndex; } /// @@ -506,9 +546,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public void Complete() { - Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError); + Self.OnWriteCompleted(this); } - + + private void PoolWriteReq(UvWriteReq writeReq) + { + if (Self._writeReqPool.Count < MaxPooledWriteReqs) + { + Self._writeReqPool.Enqueue(writeReq); + } + else + { + writeReq.Dispose(); + } + } + private void ScheduleReturnFullyWrittenBlocks() { var block = _lockedStart.Block; @@ -556,7 +608,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); - BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount); + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); + } + + public void Reset() + { + _lockedStart = default(MemoryPoolIterator2); + _lockedEnd = default(MemoryPoolIterator2); + _bufferCount = 0; + ByteCount = 0; + + SocketShutdownSend = false; + SocketDisconnect = false; + + WriteStatus = 0; + WriteError = null; + + ShutdownSendStatus = 0; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index ee43b9f60b..de603b4458 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -18,16 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { + // maximum times the work queues swapped and are processed in a single pass + // as completing a task may immediately have write data to put on the network + // otherwise it needs to wait till the next pass of the libuv loop + private const int _maxLoops = 8; + private static Action _threadCallbackAdapter = (callback, state) => ((Action)callback).Invoke((KestrelThread)state); private KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); + private Queue _workAdding = new Queue(1024); + private Queue _workRunning = new Queue(1024); + private Queue _closeHandleAdding = new Queue(256); + private Queue _closeHandleRunning = new Queue(256); private object _workSync = new Object(); private bool _stopImmediate = false; private bool _initCompleted = false; @@ -249,11 +254,17 @@ namespace Microsoft.AspNet.Server.Kestrel private void OnPost() { - DoPostWork(); - DoPostCloseHandle(); + var loopsRemaining = _maxLoops; + bool wasWork; + do + { + wasWork = DoPostWork(); + wasWork = DoPostCloseHandle() || wasWork; + loopsRemaining--; + } while (wasWork && loopsRemaining > 0); } - private void DoPostWork() + private bool DoPostWork() { Queue queue; lock (_workSync) @@ -262,6 +273,9 @@ namespace Microsoft.AspNet.Server.Kestrel _workAdding = _workRunning; _workRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var work = queue.Dequeue(); @@ -286,8 +300,10 @@ namespace Microsoft.AspNet.Server.Kestrel } } } + + return wasWork; } - private void DoPostCloseHandle() + private bool DoPostCloseHandle() { Queue queue; lock (_workSync) @@ -296,6 +312,9 @@ namespace Microsoft.AspNet.Server.Kestrel _closeHandleAdding = _closeHandleRunning; _closeHandleRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var closeHandle = queue.Dequeue(); @@ -309,6 +328,8 @@ namespace Microsoft.AspNet.Server.Kestrel throw; } } + + return wasWork; } private struct Work diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 2695036f32..21a1aaf778 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status); private IntPtr _bufs; @@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking private object _state; private const int BUFFER_COUNT = 4; - private List _pins = new List(); + private List _pins = new List(BUFFER_COUNT + 1); public UvWriteReq(IKestrelTrace logger) : base(logger) { diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index d5f86b76aa..bda6bfb651 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -146,7 +146,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; @@ -227,7 +227,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -304,7 +304,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // block 1 var start = socketOutput.ProducingStart();