From 97d33406243be0db48a57202d180a1bbb858cd4e Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Tue, 24 Nov 2015 03:20:38 +0000 Subject: [PATCH] Resuse writes, initalize queues --- .../Http/SocketOutput.cs | 110 +++++++++++++++--- .../Infrastructure/KestrelThread.cs | 8 +- .../Networking/UvWriteReq.cs | 4 +- 3 files changed, 98 insertions(+), 24 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 13c690d246..71675e5513 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -17,6 +17,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; + private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -38,6 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // This locks access to to all of the below fields private readonly object _contextLock = new object(); + private bool _isDisposed = false; // The number of write operations that have been scheduled so far // but have not completed. @@ -48,6 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; + private readonly Queue _writeContextPool; public SocketOutput( KestrelThread thread, @@ -66,6 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); + _writeContextPool = new Queue(_maxPooledWriteContexts); _head = memory.Lease(); _tail = _head; @@ -92,7 +96,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - _nextWriteContext = new WriteContext(this); + if (_writeContextPool.Count > 0) + { + _nextWriteContext = _writeContextPool.Dequeue(); + } + else + { + _nextWriteContext = new WriteContext(this); + } } if (socketShutdownSend) @@ -274,9 +285,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } // This is called on the libuv event loop - private void OnWriteCompleted(int bytesWritten, int status, Exception error) + private void OnWriteCompleted(WriteContext writeContext) { - _log.ConnectionWriteCallback(_connectionId, status); + var bytesWritten = writeContext.ByteCount; + var status = writeContext.WriteStatus; + var error = writeContext.WriteError; + if (error != null) { @@ -290,6 +304,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { + PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -332,11 +347,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + _log.ConnectionWriteCallback(_connectionId, status); + if (scheduleWrite) { - // ScheduleWrite(); - // on right thread, fairness issues? - WriteAllPending(); + ScheduleWrite(); } _tasksCompleted.Clear(); @@ -367,6 +382,32 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + private void PoolWriteContext(WriteContext writeContext) + { + // called inside _contextLock + if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts) + { + writeContext.Reset(); + _writeContextPool.Enqueue(writeContext); + } + else + { + writeContext.Dispose(); + } + } + + private void Dispose() + { + lock (_contextLock) + { + _isDisposed = true; + while (_writeContextPool.Count > 0) + { + _writeContextPool.Dequeue().Dispose(); + } + } + } + void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -408,14 +449,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffers++; } - private class WriteContext + private class WriteContext : IDisposable { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); private MemoryPoolIterator2 _lockedStart; private MemoryPoolIterator2 _lockedEnd; private int _bufferCount; - private int _byteCount; + public int ByteCount; public SocketOutput Self; @@ -425,11 +466,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public int WriteStatus; public Exception WriteError; + private UvWriteReq _writeReq; + public int ShutdownSendStatus; public WriteContext(SocketOutput self) { Self = self; + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); } /// @@ -439,18 +484,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { LockWrite(); - if (_byteCount == 0 || Self._socket.IsClosed) + if (ByteCount == 0 || Self._socket.IsClosed) { DoShutdownIfNeeded(); return; } - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); + // Sample values locally in case write completes inline + // to allow block to be Reset and still complete this function + var lockedEndBlock = _lockedEnd.Block; + var lockedEndIndex = _lockedEnd.Index; - writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { - _writeReq.Dispose(); var _this = (WriteContext)state; _this.ScheduleReturnFullyWrittenBlocks(); _this.WriteStatus = status; @@ -458,8 +504,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _this.DoShutdownIfNeeded(); }, this); - Self._head = _lockedEnd.Block; - Self._head.Start = _lockedEnd.Index; + Self._head = lockedEndBlock; + Self._head.Start = lockedEndIndex; } /// @@ -492,21 +538,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http /// public void DoDisconnectIfNeeded() { - if (SocketDisconnect == false || Self._socket.IsClosed) + if (SocketDisconnect == false) { Complete(); return; } + else if (Self._socket.IsClosed) + { + Self.Dispose(); + Complete(); + return; + } Self._socket.Dispose(); Self.ReturnAllBlocks(); + Self.Dispose(); Self._log.ConnectionStop(Self._connectionId); Complete(); } public void Complete() { - Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError); + Self.OnWriteCompleted(this); } private void ScheduleReturnFullyWrittenBlocks() @@ -555,8 +608,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); + + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); + } - BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount); + public void Reset() + { + _lockedStart = default(MemoryPoolIterator2); + _lockedEnd = default(MemoryPoolIterator2); + _bufferCount = 0; + ByteCount = 0; + + SocketShutdownSend = false; + SocketDisconnect = false; + + WriteStatus = 0; + WriteError = null; + + ShutdownSendStatus = 0; + } + + public void Dispose() + { + _writeReq.Dispose(); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index ee43b9f60b..87658395f2 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -24,10 +24,10 @@ namespace Microsoft.AspNet.Server.Kestrel private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); + private Queue _workAdding = new Queue(1024); + private Queue _workRunning = new Queue(1024); + private Queue _closeHandleAdding = new Queue(256); + private Queue _closeHandleRunning = new Queue(256); private object _workSync = new Object(); private bool _stopImmediate = false; private bool _initCompleted = false; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 2695036f32..21a1aaf778 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status); private IntPtr _bufs; @@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking private object _state; private const int BUFFER_COUNT = 4; - private List _pins = new List(); + private List _pins = new List(BUFFER_COUNT + 1); public UvWriteReq(IKestrelTrace logger) : base(logger) {