Return last block from SocketOutput when data is fully written

- This reduces Kestrel's memory usage for idle connections.
This commit is contained in:
Stephen Halter 2016-09-22 16:29:42 -07:00
parent 0edf36bd21
commit 7b2f7b94ab
1 changed files with 73 additions and 51 deletions

View File

@ -16,9 +16,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
public class SocketOutput : ISocketOutput
{
private const int _maxPendingWrites = 3;
// There should be never be more WriteContexts than the max ongoing writes + 1 for the next write to be scheduled.
private const int _maxPooledWriteContexts = _maxPendingWrites + 1;
// Well behaved WriteAsync users should await returned task, so there is no need to allocate more per connection by default
private const int _initialTaskQueues = 1;
private const int _maxPooledWriteContexts = 32;
private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock)state);
private static readonly Action<object> _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered();
@ -31,13 +32,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private readonly IKestrelTrace _log;
private readonly IThreadPool _threadPool;
// This locks all access to _tail and _lastStart.
// _head does not require a lock, since it is only used in the ctor and uv thread.
// This locks all access to _tail, _head, _lastStart and _closed.
private readonly object _returnLock = new object();
private bool _closed;
private MemoryPoolBlock _head;
private MemoryPoolBlock _tail;
private MemoryPoolIterator _lastStart;
// This locks access to to all of the below fields
@ -77,9 +77,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = thread.WriteReqPool;
_maxBytesPreCompleted = connection.ServerOptions.Limits.MaxResponseBufferSize;
_head = thread.Memory.Lease();
_tail = _head;
}
public Task WriteAsync(
@ -232,11 +229,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
{
Debug.Assert(_lastStart.IsDefault);
if (_tail == null)
if (_closed)
{
return default(MemoryPoolIterator);
}
if (_tail == null)
{
_head = _thread.Memory.Lease();
_tail = _head;
}
_lastStart = new MemoryPoolIterator(_tail, _tail.End);
return _lastStart;
@ -375,8 +378,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_lastWriteError = error;
}
PoolWriteContext(writeContext);
// _numBytesPreCompleted can temporarily go negative in the event there are
// completed writes that we haven't triggered callbacks for yet.
_numBytesPreCompleted -= bytesWritten;
@ -491,21 +492,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
// Only return the _tail if we aren't between ProducingStart/Complete calls
if (_lastStart.IsDefault)
{
_tail.Pool.Return(_tail);
_tail?.Pool.Return(_tail);
}
_head = null;
_tail = null;
}
}
private void PoolWriteContext(WriteContext writeContext)
{
// Called inside _contextLock
if (_writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
_closed = true;
}
}
@ -554,7 +546,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private class WriteContext
{
private static readonly WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock)state);
private static readonly WaitCallback _returnWrittenBlocks = (state) => ((WriteContext)state).ReturnWrittenBlocks();
private static readonly WaitCallback _completeWrite = (state) => ((WriteContext)state).CompleteOnThreadPool();
private SocketOutput Self;
@ -563,6 +555,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private MemoryPoolIterator _lockedEnd;
private int _bufferCount;
// _returnBlocksCompleted and _writeCompleted help determine when it's safe to pool the WriteContext
// These are both guarded by the _contextLock.
private bool _returnBlocksCompleted;
private bool _writeCompleted;
public int ByteCount;
public bool SocketShutdownSend;
public bool SocketDisconnect;
@ -588,10 +585,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
return;
}
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
// Update _head immediate after write is "locked", so the block returning logic
// works correctly when run inline in the write callback.
Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
_writeReq = Self._writeReqPool.Allocate();
@ -600,14 +597,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
var writeContext = (WriteContext)state;
writeContext.PoolWriteReq(writeContext._writeReq);
writeContext._writeReq = null;
writeContext.ScheduleReturnFullyWrittenBlocks();
writeContext.ScheduleReturnWrittenBlocks();
writeContext.WriteStatus = status;
writeContext.WriteError = error;
writeContext.DoShutdownIfNeeded();
}, this);
Self._head = lockedEndBlock;
Self._head.Start = lockedEndIndex;
}
/// <summary>
@ -638,7 +632,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
/// <summary>
/// Third step: disconnect socket if needed, otherwise this work item is complete
/// </summary>
public void DoDisconnectIfNeeded()
private void DoDisconnectIfNeeded()
{
if (SocketDisconnect == false || Self._socket.IsClosed)
{
@ -655,13 +649,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
CompleteWithContextLock();
}
public void CompleteWithContextLock()
private void CompleteWithContextLock()
{
if (Monitor.TryEnter(Self._contextLock))
{
try
{
Self.OnWriteCompleted(this);
_writeCompleted = true;
TryPool();
}
finally
{
@ -674,13 +670,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
}
}
public void CompleteOnThreadPool()
private void CompleteOnThreadPool()
{
lock (Self._contextLock)
{
try
{
Self.OnWriteCompleted(this);
_writeCompleted = true;
TryPool();
}
catch (Exception ex)
{
@ -694,33 +692,43 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
Self._writeReqPool.Return(writeReq);
}
private void ScheduleReturnFullyWrittenBlocks()
private void ScheduleReturnWrittenBlocks()
{
var block = _lockedStart.Block;
var end = _lockedEnd.Block;
if (block == end)
{
return;
}
while (block.Next != end)
{
block = block.Next;
}
block.Next = null;
Self._threadPool.UnsafeRun(_returnWrittenBlocks, _lockedStart.Block);
Self._threadPool.UnsafeRun(_returnWrittenBlocks, this);
}
private static void ReturnWrittenBlocks(MemoryPoolBlock block)
private void ReturnWrittenBlocks()
{
while (block != null)
var block = _lockedStart.Block;
while (block != _lockedEnd.Block)
{
var returnBlock = block;
block = block.Next;
returnBlock.Pool.Return(returnBlock);
}
lock (Self._returnLock)
{
// If everything has been fully written, return _tail.
if (_lockedEnd.Block == Self._tail &&
_lockedEnd.Index == Self._tail.End &&
Self._lastStart.IsDefault)
{
Debug.Assert(Self._head == Self._tail);
Debug.Assert(Self._tail.Start == Self._tail.End);
_lockedEnd.Block.Pool.Return(_lockedEnd.Block);
Self._head = null;
Self._tail = null;
}
}
lock (Self._contextLock)
{
_returnBlocksCompleted = true;
TryPool();
}
}
private void LockWrite()
@ -741,12 +749,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}
public void Reset()
private void TryPool()
{
// Called inside _contextLock
if (_writeCompleted &&
_returnBlocksCompleted &&
Self._writeContextPool.Count < _maxPooledWriteContexts)
{
Reset();
Self._writeContextPool.Enqueue(this);
}
}
private void Reset()
{
_lockedStart = default(MemoryPoolIterator);
_lockedEnd = default(MemoryPoolIterator);
_bufferCount = 0;
ByteCount = 0;
_writeCompleted = false;
_returnBlocksCompleted = false;
SocketShutdownSend = false;
SocketDisconnect = false;