Resuse writes, initalize queues

This commit is contained in:
Ben Adams 2015-11-24 03:20:38 +00:00
parent f9d70e601c
commit 97d3340624
3 changed files with 98 additions and 24 deletions

View File

@ -17,6 +17,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;
private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
@ -38,6 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// This locks access to to all of the below fields
private readonly object _contextLock = new object();
private bool _isDisposed = false;
// The number of write operations that have been scheduled so far
// but have not completed.
@ -48,6 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<WriteContext> _writeContextPool;
public SocketOutput(
KestrelThread thread,
@ -66,6 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_head = memory.Lease();
_tail = _head;
@ -92,7 +96,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
if (_writeContextPool.Count > 0)
{
_nextWriteContext = _writeContextPool.Dequeue();
}
else
{
_nextWriteContext = new WriteContext(this);
}
}
if (socketShutdownSend)
@ -274,9 +285,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
// This is called on the libuv event loop
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
private void OnWriteCompleted(WriteContext writeContext)
{
_log.ConnectionWriteCallback(_connectionId, status);
var bytesWritten = writeContext.ByteCount;
var status = writeContext.WriteStatus;
var error = writeContext.WriteError;
if (error != null)
{
@ -290,6 +304,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
lock (_contextLock)
{
PoolWriteContext(writeContext);
if (_nextWriteContext != null)
{
scheduleWrite = true;
@ -332,11 +347,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
_log.ConnectionWriteCallback(_connectionId, status);
if (scheduleWrite)
{
// ScheduleWrite();
// on right thread, fairness issues?
WriteAllPending();
ScheduleWrite();
}
_tasksCompleted.Clear();
@ -367,6 +382,32 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
private void PoolWriteContext(WriteContext writeContext)
{
// called inside _contextLock
if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
}
else
{
writeContext.Dispose();
}
}
private void Dispose()
{
lock (_contextLock)
{
_isDisposed = true;
while (_writeContextPool.Count > 0)
{
_writeContextPool.Dequeue().Dispose();
}
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
@ -408,14 +449,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
buffers++;
}
private class WriteContext
private class WriteContext : IDisposable
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
private MemoryPoolIterator2 _lockedStart;
private MemoryPoolIterator2 _lockedEnd;
private int _bufferCount;
private int _byteCount;
public int ByteCount;
public SocketOutput Self;
@ -425,11 +466,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public int WriteStatus;
public Exception WriteError;
private UvWriteReq _writeReq;
public int ShutdownSendStatus;
public WriteContext(SocketOutput self)
{
Self = self;
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}
/// <summary>
@ -439,18 +484,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
LockWrite();
if (_byteCount == 0 || Self._socket.IsClosed)
if (ByteCount == 0 || Self._socket.IsClosed)
{
DoShutdownIfNeeded();
return;
}
var writeReq = new UvWriteReq(Self._log);
writeReq.Init(Self._thread.Loop);
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
@ -458,8 +504,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_this.DoShutdownIfNeeded();
}, this);
Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
Self._head = lockedEndBlock;
Self._head.Start = lockedEndIndex;
}
/// <summary>
@ -492,21 +538,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
/// </summary>
public void DoDisconnectIfNeeded()
{
if (SocketDisconnect == false || Self._socket.IsClosed)
if (SocketDisconnect == false)
{
Complete();
return;
}
else if (Self._socket.IsClosed)
{
Self.Dispose();
Complete();
return;
}
Self._socket.Dispose();
Self.ReturnAllBlocks();
Self.Dispose();
Self._log.ConnectionStop(Self._connectionId);
Complete();
}
public void Complete()
{
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
Self.OnWriteCompleted(this);
}
private void ScheduleReturnFullyWrittenBlocks()
@ -555,8 +608,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_lockedStart = new MemoryPoolIterator2(head, head.Start);
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}
BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount);
public void Reset()
{
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;
SocketShutdownSend = false;
SocketDisconnect = false;
WriteStatus = 0;
WriteError = null;
ShutdownSendStatus = 0;
}
public void Dispose()
{
_writeReq.Dispose();
}
}
}

View File

@ -24,10 +24,10 @@ namespace Microsoft.AspNet.Server.Kestrel
private Thread _thread;
private UvLoopHandle _loop;
private UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>();
private Queue<Work> _workRunning = new Queue<Work>();
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
private Queue<Work> _workAdding = new Queue<Work>(1024);
private Queue<Work> _workRunning = new Queue<Work>(1024);
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>(256);
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>(256);
private object _workSync = new Object();
private bool _stopImmediate = false;
private bool _initCompleted = false;

View File

@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
/// </summary>
public class UvWriteReq : UvRequest
{
private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb;
private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status);
private IntPtr _bufs;
@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private object _state;
private const int BUFFER_COUNT = 4;
private List<GCHandle> _pins = new List<GCHandle>();
private List<GCHandle> _pins = new List<GCHandle>(BUFFER_COUNT + 1);
public UvWriteReq(IKestrelTrace logger) : base(logger)
{