diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketOutput.cs index 26621f4022..29124e0039 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketOutput.cs @@ -16,9 +16,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http public class SocketOutput : ISocketOutput { private const int _maxPendingWrites = 3; + // There should be never be more WriteContexts than the max ongoing writes + 1 for the next write to be scheduled. + private const int _maxPooledWriteContexts = _maxPendingWrites + 1; // Well behaved WriteAsync users should await returned task, so there is no need to allocate more per connection by default private const int _initialTaskQueues = 1; - private const int _maxPooledWriteContexts = 32; private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock)state); private static readonly Action _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered(); @@ -31,13 +32,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http private readonly IKestrelTrace _log; private readonly IThreadPool _threadPool; - // This locks all access to _tail and _lastStart. - // _head does not require a lock, since it is only used in the ctor and uv thread. + // This locks all access to _tail, _head, _lastStart and _closed. private readonly object _returnLock = new object(); + private bool _closed; private MemoryPoolBlock _head; private MemoryPoolBlock _tail; - private MemoryPoolIterator _lastStart; // This locks access to to all of the below fields @@ -77,9 +77,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http _writeContextPool = new Queue(_maxPooledWriteContexts); _writeReqPool = thread.WriteReqPool; _maxBytesPreCompleted = connection.ServerOptions.Limits.MaxResponseBufferSize; - - _head = thread.Memory.Lease(); - _tail = _head; } public Task WriteAsync( @@ -232,11 +229,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http { Debug.Assert(_lastStart.IsDefault); - if (_tail == null) + if (_closed) { return default(MemoryPoolIterator); } + if (_tail == null) + { + _head = _thread.Memory.Lease(); + _tail = _head; + } + _lastStart = new MemoryPoolIterator(_tail, _tail.End); return _lastStart; @@ -375,8 +378,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http _lastWriteError = error; } - PoolWriteContext(writeContext); - // _numBytesPreCompleted can temporarily go negative in the event there are // completed writes that we haven't triggered callbacks for yet. _numBytesPreCompleted -= bytesWritten; @@ -491,21 +492,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http // Only return the _tail if we aren't between ProducingStart/Complete calls if (_lastStart.IsDefault) { - _tail.Pool.Return(_tail); + _tail?.Pool.Return(_tail); } _head = null; _tail = null; - } - } - - private void PoolWriteContext(WriteContext writeContext) - { - // Called inside _contextLock - if (_writeContextPool.Count < _maxPooledWriteContexts) - { - writeContext.Reset(); - _writeContextPool.Enqueue(writeContext); + _closed = true; } } @@ -554,7 +546,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http private class WriteContext { - private static readonly WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock)state); + private static readonly WaitCallback _returnWrittenBlocks = (state) => ((WriteContext)state).ReturnWrittenBlocks(); private static readonly WaitCallback _completeWrite = (state) => ((WriteContext)state).CompleteOnThreadPool(); private SocketOutput Self; @@ -563,6 +555,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http private MemoryPoolIterator _lockedEnd; private int _bufferCount; + // _returnBlocksCompleted and _writeCompleted help determine when it's safe to pool the WriteContext + // These are both guarded by the _contextLock. + private bool _returnBlocksCompleted; + private bool _writeCompleted; + public int ByteCount; public bool SocketShutdownSend; public bool SocketDisconnect; @@ -588,10 +585,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http return; } - // Sample values locally in case write completes inline - // to allow block to be Reset and still complete this function - var lockedEndBlock = _lockedEnd.Block; - var lockedEndIndex = _lockedEnd.Index; + // Update _head immediate after write is "locked", so the block returning logic + // works correctly when run inline in the write callback. + Self._head = _lockedEnd.Block; + Self._head.Start = _lockedEnd.Index; _writeReq = Self._writeReqPool.Allocate(); @@ -600,14 +597,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http var writeContext = (WriteContext)state; writeContext.PoolWriteReq(writeContext._writeReq); writeContext._writeReq = null; - writeContext.ScheduleReturnFullyWrittenBlocks(); + writeContext.ScheduleReturnWrittenBlocks(); writeContext.WriteStatus = status; writeContext.WriteError = error; writeContext.DoShutdownIfNeeded(); }, this); - - Self._head = lockedEndBlock; - Self._head.Start = lockedEndIndex; } /// @@ -638,7 +632,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http /// /// Third step: disconnect socket if needed, otherwise this work item is complete /// - public void DoDisconnectIfNeeded() + private void DoDisconnectIfNeeded() { if (SocketDisconnect == false || Self._socket.IsClosed) { @@ -655,13 +649,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http CompleteWithContextLock(); } - public void CompleteWithContextLock() + private void CompleteWithContextLock() { if (Monitor.TryEnter(Self._contextLock)) { try { Self.OnWriteCompleted(this); + _writeCompleted = true; + TryPool(); } finally { @@ -674,13 +670,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http } } - public void CompleteOnThreadPool() + private void CompleteOnThreadPool() { lock (Self._contextLock) { try { Self.OnWriteCompleted(this); + _writeCompleted = true; + TryPool(); } catch (Exception ex) { @@ -694,33 +692,43 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http Self._writeReqPool.Return(writeReq); } - private void ScheduleReturnFullyWrittenBlocks() + private void ScheduleReturnWrittenBlocks() { - var block = _lockedStart.Block; - var end = _lockedEnd.Block; - if (block == end) - { - return; - } - - while (block.Next != end) - { - block = block.Next; - } - block.Next = null; - - Self._threadPool.UnsafeRun(_returnWrittenBlocks, _lockedStart.Block); + Self._threadPool.UnsafeRun(_returnWrittenBlocks, this); } - private static void ReturnWrittenBlocks(MemoryPoolBlock block) + private void ReturnWrittenBlocks() { - while (block != null) + var block = _lockedStart.Block; + while (block != _lockedEnd.Block) { var returnBlock = block; block = block.Next; returnBlock.Pool.Return(returnBlock); } + + lock (Self._returnLock) + { + // If everything has been fully written, return _tail. + if (_lockedEnd.Block == Self._tail && + _lockedEnd.Index == Self._tail.End && + Self._lastStart.IsDefault) + { + Debug.Assert(Self._head == Self._tail); + Debug.Assert(Self._tail.Start == Self._tail.End); + + _lockedEnd.Block.Pool.Return(_lockedEnd.Block); + Self._head = null; + Self._tail = null; + } + } + + lock (Self._contextLock) + { + _returnBlocksCompleted = true; + TryPool(); + } } private void LockWrite() @@ -741,12 +749,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); } - public void Reset() + private void TryPool() + { + // Called inside _contextLock + if (_writeCompleted && + _returnBlocksCompleted && + Self._writeContextPool.Count < _maxPooledWriteContexts) + { + Reset(); + Self._writeContextPool.Enqueue(this); + } + } + + private void Reset() { _lockedStart = default(MemoryPoolIterator); _lockedEnd = default(MemoryPoolIterator); _bufferCount = 0; ByteCount = 0; + _writeCompleted = false; + _returnBlocksCompleted = false; SocketShutdownSend = false; SocketDisconnect = false;