From 97d33406243be0db48a57202d180a1bbb858cd4e Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Tue, 24 Nov 2015 03:20:38 +0000 Subject: [PATCH 1/4] Resuse writes, initalize queues --- .../Http/SocketOutput.cs | 110 +++++++++++++++--- .../Infrastructure/KestrelThread.cs | 8 +- .../Networking/UvWriteReq.cs | 4 +- 3 files changed, 98 insertions(+), 24 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 13c690d246..71675e5513 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -17,6 +17,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; + private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -38,6 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // This locks access to to all of the below fields private readonly object _contextLock = new object(); + private bool _isDisposed = false; // The number of write operations that have been scheduled so far // but have not completed. @@ -48,6 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; + private readonly Queue _writeContextPool; public SocketOutput( KestrelThread thread, @@ -66,6 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); + _writeContextPool = new Queue(_maxPooledWriteContexts); _head = memory.Lease(); _tail = _head; @@ -92,7 +96,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - _nextWriteContext = new WriteContext(this); + if (_writeContextPool.Count > 0) + { + _nextWriteContext = _writeContextPool.Dequeue(); + } + else + { + _nextWriteContext = new WriteContext(this); + } } if (socketShutdownSend) @@ -274,9 +285,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } // This is called on the libuv event loop - private void OnWriteCompleted(int bytesWritten, int status, Exception error) + private void OnWriteCompleted(WriteContext writeContext) { - _log.ConnectionWriteCallback(_connectionId, status); + var bytesWritten = writeContext.ByteCount; + var status = writeContext.WriteStatus; + var error = writeContext.WriteError; + if (error != null) { @@ -290,6 +304,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { + PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -332,11 +347,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + _log.ConnectionWriteCallback(_connectionId, status); + if (scheduleWrite) { - // ScheduleWrite(); - // on right thread, fairness issues? - WriteAllPending(); + ScheduleWrite(); } _tasksCompleted.Clear(); @@ -367,6 +382,32 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + private void PoolWriteContext(WriteContext writeContext) + { + // called inside _contextLock + if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts) + { + writeContext.Reset(); + _writeContextPool.Enqueue(writeContext); + } + else + { + writeContext.Dispose(); + } + } + + private void Dispose() + { + lock (_contextLock) + { + _isDisposed = true; + while (_writeContextPool.Count > 0) + { + _writeContextPool.Dequeue().Dispose(); + } + } + } + void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -408,14 +449,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffers++; } - private class WriteContext + private class WriteContext : IDisposable { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); private MemoryPoolIterator2 _lockedStart; private MemoryPoolIterator2 _lockedEnd; private int _bufferCount; - private int _byteCount; + public int ByteCount; public SocketOutput Self; @@ -425,11 +466,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public int WriteStatus; public Exception WriteError; + private UvWriteReq _writeReq; + public int ShutdownSendStatus; public WriteContext(SocketOutput self) { Self = self; + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); } /// @@ -439,18 +484,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { LockWrite(); - if (_byteCount == 0 || Self._socket.IsClosed) + if (ByteCount == 0 || Self._socket.IsClosed) { DoShutdownIfNeeded(); return; } - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); + // Sample values locally in case write completes inline + // to allow block to be Reset and still complete this function + var lockedEndBlock = _lockedEnd.Block; + var lockedEndIndex = _lockedEnd.Index; - writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { - _writeReq.Dispose(); var _this = (WriteContext)state; _this.ScheduleReturnFullyWrittenBlocks(); _this.WriteStatus = status; @@ -458,8 +504,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _this.DoShutdownIfNeeded(); }, this); - Self._head = _lockedEnd.Block; - Self._head.Start = _lockedEnd.Index; + Self._head = lockedEndBlock; + Self._head.Start = lockedEndIndex; } /// @@ -492,21 +538,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http /// public void DoDisconnectIfNeeded() { - if (SocketDisconnect == false || Self._socket.IsClosed) + if (SocketDisconnect == false) { Complete(); return; } + else if (Self._socket.IsClosed) + { + Self.Dispose(); + Complete(); + return; + } Self._socket.Dispose(); Self.ReturnAllBlocks(); + Self.Dispose(); Self._log.ConnectionStop(Self._connectionId); Complete(); } public void Complete() { - Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError); + Self.OnWriteCompleted(this); } private void ScheduleReturnFullyWrittenBlocks() @@ -555,8 +608,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); + + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); + } - BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount); + public void Reset() + { + _lockedStart = default(MemoryPoolIterator2); + _lockedEnd = default(MemoryPoolIterator2); + _bufferCount = 0; + ByteCount = 0; + + SocketShutdownSend = false; + SocketDisconnect = false; + + WriteStatus = 0; + WriteError = null; + + ShutdownSendStatus = 0; + } + + public void Dispose() + { + _writeReq.Dispose(); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index ee43b9f60b..87658395f2 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -24,10 +24,10 @@ namespace Microsoft.AspNet.Server.Kestrel private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); + private Queue _workAdding = new Queue(1024); + private Queue _workRunning = new Queue(1024); + private Queue _closeHandleAdding = new Queue(256); + private Queue _closeHandleRunning = new Queue(256); private object _workSync = new Object(); private bool _stopImmediate = false; private bool _initCompleted = false; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 2695036f32..21a1aaf778 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status); private IntPtr _bufs; @@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking private object _state; private const int BUFFER_COUNT = 4; - private List _pins = new List(); + private List _pins = new List(BUFFER_COUNT + 1); public UvWriteReq(IKestrelTrace logger) : base(logger) { From 992664e0dc7ae0ff036be56eb0ca696912a97614 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 25 Nov 2015 04:44:07 +0000 Subject: [PATCH 2/4] Process cascaded work immediately Without waiting for next libuv pass Fix for potential regression in #363 due to bug fix. --- .../Infrastructure/KestrelThread.cs | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 87658395f2..de603b4458 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -18,6 +18,11 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { + // maximum times the work queues swapped and are processed in a single pass + // as completing a task may immediately have write data to put on the network + // otherwise it needs to wait till the next pass of the libuv loop + private const int _maxLoops = 8; + private static Action _threadCallbackAdapter = (callback, state) => ((Action)callback).Invoke((KestrelThread)state); private KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; @@ -249,11 +254,17 @@ namespace Microsoft.AspNet.Server.Kestrel private void OnPost() { - DoPostWork(); - DoPostCloseHandle(); + var loopsRemaining = _maxLoops; + bool wasWork; + do + { + wasWork = DoPostWork(); + wasWork = DoPostCloseHandle() || wasWork; + loopsRemaining--; + } while (wasWork && loopsRemaining > 0); } - private void DoPostWork() + private bool DoPostWork() { Queue queue; lock (_workSync) @@ -262,6 +273,9 @@ namespace Microsoft.AspNet.Server.Kestrel _workAdding = _workRunning; _workRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var work = queue.Dequeue(); @@ -286,8 +300,10 @@ namespace Microsoft.AspNet.Server.Kestrel } } } + + return wasWork; } - private void DoPostCloseHandle() + private bool DoPostCloseHandle() { Queue queue; lock (_workSync) @@ -296,6 +312,9 @@ namespace Microsoft.AspNet.Server.Kestrel _closeHandleAdding = _closeHandleRunning; _closeHandleRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var closeHandle = queue.Dequeue(); @@ -309,6 +328,8 @@ namespace Microsoft.AspNet.Server.Kestrel throw; } } + + return wasWork; } private struct Work From 3e42904096d6a7d1f53543ee4ad9f5e625456c6e Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 9 Dec 2015 17:12:45 -0800 Subject: [PATCH 3/4] Pool UvWriteReqs instead of SocketOutput.WriteContexts - This allows all connections accepted by the same thread to share a pool --- .../Http/Connection.cs | 2 +- .../Http/Listener.cs | 7 ++ .../Http/ListenerContext.cs | 7 +- .../Http/SocketOutput.cs | 111 ++++++------------ .../SocketOutputTests.cs | 10 +- 5 files changed, 57 insertions(+), 80 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index b49ffd4316..89b9db55e8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); _rawSocketInput = new SocketInput(Memory2, ThreadPool); - _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); } public void Start() diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index 6721756266..5d8c4e8d0d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -94,6 +94,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { var socket = (Listener)tcs2.Task.AsyncState; socket.ListenSocket.Dispose(); + + var writeReqPool = socket.WriteReqPool; + while (writeReqPool.Count > 0) + { + writeReqPool.Dequeue().Dispose(); + } + tcs2.SetResult(0); } catch (Exception ex) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs index fecd0afa44..7247afa2bd 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs @@ -1,8 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using Microsoft.AspNet.Http; +using System.Collections.Generic; using Microsoft.AspNet.Server.Kestrel.Infrastructure; +using Microsoft.AspNet.Server.Kestrel.Networking; namespace Microsoft.AspNet.Server.Kestrel.Http { @@ -17,6 +18,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http : base(serviceContext) { Memory2 = new MemoryPool2(); + WriteReqPool = new Queue(SocketOutput.MaxPooledWriteReqs); } public ListenerContext(ListenerContext listenerContext) @@ -25,6 +27,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Memory2 = listenerContext.Memory2; + WriteReqPool = listenerContext.WriteReqPool; Log = listenerContext.Log; } @@ -33,5 +36,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public KestrelThread Thread { get; set; } public MemoryPool2 Memory2 { get; set; } + + public Queue WriteReqPool { get; set; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 71675e5513..39dcf3c2c8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -14,10 +14,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { + public const int MaxPooledWriteReqs = 1024; + private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; - private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -39,7 +40,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // This locks access to to all of the below fields private readonly object _contextLock = new object(); - private bool _isDisposed = false; // The number of write operations that have been scheduled so far // but have not completed. @@ -50,7 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; - private readonly Queue _writeContextPool; + private readonly Queue _writeReqPool; public SocketOutput( KestrelThread thread, @@ -59,7 +59,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http Connection connection, long connectionId, IKestrelTrace log, - IThreadPool threadPool) + IThreadPool threadPool, + Queue writeReqPool) { _thread = thread; _socket = socket; @@ -69,7 +70,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); - _writeContextPool = new Queue(_maxPooledWriteContexts); + _writeReqPool = writeReqPool; _head = memory.Lease(); _tail = _head; @@ -96,14 +97,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - if (_writeContextPool.Count > 0) - { - _nextWriteContext = _writeContextPool.Dequeue(); - } - else - { - _nextWriteContext = new WriteContext(this); - } + _nextWriteContext = new WriteContext(this); } if (socketShutdownSend) @@ -304,7 +298,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { - PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -382,32 +375,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } - private void PoolWriteContext(WriteContext writeContext) - { - // called inside _contextLock - if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts) - { - writeContext.Reset(); - _writeContextPool.Enqueue(writeContext); - } - else - { - writeContext.Dispose(); - } - } - - private void Dispose() - { - lock (_contextLock) - { - _isDisposed = true; - while (_writeContextPool.Count > 0) - { - _writeContextPool.Dequeue().Dispose(); - } - } - } - void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -449,7 +416,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffers++; } - private class WriteContext : IDisposable + private class WriteContext { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); @@ -473,8 +440,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public WriteContext(SocketOutput self) { Self = self; - _writeReq = new UvWriteReq(Self._log); - _writeReq.Init(Self._thread.Loop); } /// @@ -495,13 +460,24 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var lockedEndBlock = _lockedEnd.Block; var lockedEndIndex = _lockedEnd.Index; + if (Self._writeReqPool.Count > 0) + { + _writeReq = Self._writeReqPool.Dequeue(); + } + else + { + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); + } + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { - var _this = (WriteContext)state; - _this.ScheduleReturnFullyWrittenBlocks(); - _this.WriteStatus = status; - _this.WriteError = error; - _this.DoShutdownIfNeeded(); + var writeContext = (WriteContext)state; + writeContext.PoolWriteReq(writeContext._writeReq); + writeContext.ScheduleReturnFullyWrittenBlocks(); + writeContext.WriteStatus = status; + writeContext.WriteError = error; + writeContext.DoShutdownIfNeeded(); }, this); Self._head = lockedEndBlock; @@ -545,14 +521,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else if (Self._socket.IsClosed) { - Self.Dispose(); Complete(); return; } Self._socket.Dispose(); Self.ReturnAllBlocks(); - Self.Dispose(); Self._log.ConnectionStop(Self._connectionId); Complete(); } @@ -561,7 +535,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { Self.OnWriteCompleted(this); } - + + private void PoolWriteReq(UvWriteReq writeReq) + { + if (Self._writeReqPool.Count < MaxPooledWriteReqs) + { + Self._writeReqPool.Enqueue(writeReq); + } + else + { + writeReq.Dispose(); + } + } + private void ScheduleReturnFullyWrittenBlocks() { var block = _lockedStart.Block; @@ -608,30 +594,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); - + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); } - - public void Reset() - { - _lockedStart = default(MemoryPoolIterator2); - _lockedEnd = default(MemoryPoolIterator2); - _bufferCount = 0; - ByteCount = 0; - - SocketShutdownSend = false; - SocketDisconnect = false; - - WriteStatus = 0; - WriteError = null; - - ShutdownSendStatus = 0; - } - - public void Dispose() - { - _writeReq.Dispose(); - } } } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index d5f86b76aa..bda6bfb651 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -146,7 +146,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; @@ -227,7 +227,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -304,7 +304,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // block 1 var start = socketOutput.ProducingStart(); From 5665eba64609edbcab6be957fbae9e8f1703b54f Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Thu, 10 Dec 2015 12:45:58 +0000 Subject: [PATCH 4/4] Pool WriteContexts additionally --- .../Http/SocketOutput.cs | 57 ++++++++++++++----- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 39dcf3c2c8..9a0f9f8a14 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -19,6 +19,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; + private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -44,12 +45,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // The number of write operations that have been scheduled so far // but have not completed. private int _writesPending = 0; - private int _numBytesPreCompleted = 0; private Exception _lastWriteError; private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; + private readonly Queue _writeContextPool; private readonly Queue _writeReqPool; public SocketOutput( @@ -70,6 +71,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); + _writeContextPool = new Queue(_maxPooledWriteContexts); _writeReqPool = writeReqPool; _head = memory.Lease(); @@ -97,7 +99,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - _nextWriteContext = new WriteContext(this); + if (_writeContextPool.Count > 0) + { + _nextWriteContext = _writeContextPool.Dequeue(); + } + else + { + _nextWriteContext = new WriteContext(this); + } } if (socketShutdownSend) @@ -298,6 +307,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { + PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -375,6 +385,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + private void PoolWriteContext(WriteContext writeContext) + { + // called inside _contextLock + if (_writeContextPool.Count < _maxPooledWriteContexts) + { + writeContext.Reset(); + _writeContextPool.Enqueue(writeContext); + } + } + void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -420,21 +440,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); + private SocketOutput Self; + private UvWriteReq _writeReq; private MemoryPoolIterator2 _lockedStart; private MemoryPoolIterator2 _lockedEnd; private int _bufferCount; + public int ByteCount; - - public SocketOutput Self; - public bool SocketShutdownSend; public bool SocketDisconnect; public int WriteStatus; public Exception WriteError; - - private UvWriteReq _writeReq; - public int ShutdownSendStatus; public WriteContext(SocketOutput self) @@ -474,6 +491,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { var writeContext = (WriteContext)state; writeContext.PoolWriteReq(writeContext._writeReq); + writeContext._writeReq = null; writeContext.ScheduleReturnFullyWrittenBlocks(); writeContext.WriteStatus = status; writeContext.WriteError = error; @@ -514,12 +532,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http /// public void DoDisconnectIfNeeded() { - if (SocketDisconnect == false) - { - Complete(); - return; - } - else if (Self._socket.IsClosed) + if (SocketDisconnect == false || Self._socket.IsClosed) { Complete(); return; @@ -597,6 +610,22 @@ namespace Microsoft.AspNet.Server.Kestrel.Http BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); } + + public void Reset() + { + _lockedStart = default(MemoryPoolIterator2); + _lockedEnd = default(MemoryPoolIterator2); + _bufferCount = 0; + ByteCount = 0; + + SocketShutdownSend = false; + SocketDisconnect = false; + + WriteStatus = 0; + WriteError = null; + + ShutdownSendStatus = 0; + } } } }