From 74fa82bca73e381bd251f40750e8b6637a7f6ccd Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Mon, 13 Jul 2015 10:34:19 -0700 Subject: [PATCH] Complete WriteAsync Tasks early when there are less than 64KB buffered --- .../Http/SocketOutput.cs | 99 +++++++++++++------ 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index b611b3cf00..5deeb83672 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -11,23 +11,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public class SocketOutput : ISocketOutput { private const int _maxPendingWrites = 3; + private const int _maxBytesBufferedBeforeThrottling = 65536 / 8; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; - private WriteContext _nextWriteContext; + // This locks all access to to all the below + private readonly object _lockObj = new object(); // The number of write operations that have been scheduled so far // but have not completed. - private int _writesSending = 0; + private int _writesPending = 0; - // This locks all access to _nextWriteContext and _writesSending - private readonly object _lockObj = new object(); + private int _numBytesBuffered = 0; + private Exception _lastWriteError; + private WriteContext _nextWriteContext; + private readonly Queue _callbacksPending; public SocketOutput(KestrelThread thread, UvStreamHandle socket) { _thread = thread; _socket = socket; + _callbacksPending = new Queue(); } public void Write(ArraySegment buffer, Action callback, object state) @@ -39,11 +44,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http KestrelTrace.Log.ConnectionWrite(0, buffer.Count); - var context = new WriteOperation + var writeOp = new WriteOperation + { + Buffer = buffer + }; + + var callbackContext = new CallbackContext { - Buffer = buffer, Callback = callback, - State = state + State = state, + BytesToWrite = buffer.Count }; lock (_lockObj) @@ -53,12 +63,26 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _nextWriteContext = new WriteContext(this); } - _nextWriteContext.Operations.Add(context); + _nextWriteContext.Operations.Enqueue(writeOp); + _numBytesBuffered += buffer.Count; - if (_writesSending < _maxPendingWrites) + // Complete the write task immediately if all previous write tasks have been completed, + // the buffers haven't grown too large, and the last write to the socket succeeded. + if (_lastWriteError == null && + _callbacksPending.Count == 0 && + _numBytesBuffered < _maxBytesBufferedBeforeThrottling) + { + TriggerCallback(callbackContext); + } + else + { + _callbacksPending.Enqueue(callbackContext); + } + + if (_writesPending < _maxPendingWrites) { ScheduleWrite(); - _writesSending++; + _writesPending++; } } } @@ -86,7 +110,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else { - _writesSending--; + _writesPending--; return; } } @@ -114,7 +138,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { // Lock instead of using Interlocked.Decrement so _writesSending // doesn't change in the middle of executing other synchronized code. - _writesSending--; + _writesPending--; } throw; @@ -122,43 +146,58 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } // This is called on the libuv event loop - private void OnWriteCompleted(List completedWrites, UvWriteReq req, int status, Exception error) + private void OnWriteCompleted(Queue completedWrites, UvWriteReq req, int status, Exception error) { lock (_lockObj) { + _lastWriteError = error; + if (_nextWriteContext != null) { ScheduleWrite(); } else { - _writesSending--; + _writesPending--; + } + + foreach (var writeOp in completedWrites) + { + _numBytesBuffered -= writeOp.Buffer.Count; + } + + var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered; + while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite < bytesLeftToBuffer) + { + var context = _callbacksPending.Dequeue(); + TriggerCallback(context); } } req.Dispose(); + } - foreach (var writeOp in completedWrites) + private void TriggerCallback(CallbackContext context) + { + context.Error = _lastWriteError; + ThreadPool.QueueUserWorkItem(obj => { - KestrelTrace.Log.ConnectionWriteCallback(0, status); - //NOTE: pool this? + var c = (CallbackContext)obj; + c.Callback(c.Error, c.State); + }, context); + } - // Get off the event loop before calling user code! - writeOp.Error = error; - ThreadPool.QueueUserWorkItem(obj => - { - var op = (WriteOperation)obj; - op.Callback(op.Error, op.State); - }, writeOp); - } + private class CallbackContext + { + public Exception Error; + public Action Callback; + public object State; + public int BytesToWrite; } private class WriteOperation { public ArraySegment Buffer; - public Exception Error; - public Action Callback; - public object State; } private class WriteContext @@ -170,13 +209,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http WriteReq = new UvWriteReq(); WriteReq.Init(self._thread.Loop); - Operations = new List(); + Operations = new Queue(); } public SocketOutput Self; public UvWriteReq WriteReq; - public List Operations; + public Queue Operations; } } }