From d723f9da210e97ead29cf4fae403ec0a8b4ad452 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 8 Jul 2015 11:50:11 -0700 Subject: [PATCH] Reduce calls to uv_write by calling it with multiple buffers when possible --- .../Http/SocketOutput.cs | 176 +++++++++++++----- 1 file changed, 131 insertions(+), 45 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 4c3e4c8cc8..b611b3cf00 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -3,15 +3,27 @@ using Microsoft.AspNet.Server.Kestrel.Networking; using System; +using System.Collections.Generic; using System.Threading; namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { + private const int _maxPendingWrites = 3; + private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; + private WriteContext _nextWriteContext; + + // The number of write operations that have been scheduled so far + // but have not completed. + private int _writesSending = 0; + + // This locks all access to _nextWriteContext and _writesSending + private readonly object _lockObj = new object(); + public SocketOutput(KestrelThread thread, UvStreamHandle socket) { _thread = thread; @@ -26,71 +38,145 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffer = new ArraySegment(copy); KestrelTrace.Log.ConnectionWrite(0, buffer.Count); - var req = new ThisWriteReq(); - req.Init(_thread.Loop); - req.Contextualize(this, _socket, buffer, callback, state); - req.Write(); + + var context = new WriteOperation + { + Buffer = buffer, + Callback = callback, + State = state + }; + + lock (_lockObj) + { + if (_nextWriteContext == null) + { + _nextWriteContext = new WriteContext(this); + } + + _nextWriteContext.Operations.Add(context); + + if (_writesSending < _maxPendingWrites) + { + ScheduleWrite(); + _writesSending++; + } + } } - public class ThisWriteReq : UvWriteReq + private void ScheduleWrite() { - SocketOutput _self; - ArraySegment _buffer; - UvStreamHandle _socket; - Action _callback; - object _state; - Exception _callbackError; - - internal void Contextualize( - SocketOutput socketOutput, - UvStreamHandle socket, - ArraySegment buffer, - Action callback, - object state) + _thread.Post(obj => { - _self = socketOutput; - _socket = socket; - _buffer = buffer; - _callback = callback; - _state = state; - } + var self = (SocketOutput)obj; + self.WriteAllPending(); + }, this); + } - public void Write() + // This is called on the libuv event loop + private void WriteAllPending() + { + WriteContext writingContext; + + lock (_lockObj) { - _self._thread.Post(obj => + if (_nextWriteContext != null) { - var req = (ThisWriteReq)obj; - req.Write( - req._socket, - new ArraySegment>( - new[] { req._buffer }), - (r, status, error, state) => ((ThisWriteReq)state).OnWrite(status, error), - req); - }, this); + writingContext = _nextWriteContext; + _nextWriteContext = null; + } + else + { + _writesSending--; + return; + } } - private void OnWrite(int status, Exception error) + try + { + var buffers = new ArraySegment[writingContext.Operations.Count]; + + var i = 0; + foreach (var writeOp in writingContext.Operations) + { + buffers[i] = writeOp.Buffer; + i++; + } + + writingContext.WriteReq.Write(_socket, new ArraySegment>(buffers), (r, status, error, state) => + { + var writtenContext = (WriteContext)state; + writtenContext.Self.OnWriteCompleted(writtenContext.Operations, r, status, error); + }, writingContext); + } + catch + { + lock (_lockObj) + { + // Lock instead of using Interlocked.Decrement so _writesSending + // doesn't change in the middle of executing other synchronized code. + _writesSending--; + } + + throw; + } + } + + // This is called on the libuv event loop + private void OnWriteCompleted(List completedWrites, UvWriteReq req, int status, Exception error) + { + lock (_lockObj) + { + if (_nextWriteContext != null) + { + ScheduleWrite(); + } + else + { + _writesSending--; + } + } + + req.Dispose(); + + foreach (var writeOp in completedWrites) { KestrelTrace.Log.ConnectionWriteCallback(0, status); //NOTE: pool this? - Dispose(); - // Get off the event loop before calling user code! - _callbackError = error; + writeOp.Error = error; ThreadPool.QueueUserWorkItem(obj => { - var req = (ThisWriteReq)obj; - req._callback(req._callbackError, req._state); - }, this); - } + var op = (WriteOperation)obj; + op.Callback(op.Error, op.State); + }, writeOp); + } } - - public bool Flush(Action drained) + private class WriteOperation { - return false; + public ArraySegment Buffer; + public Exception Error; + public Action Callback; + public object State; } + private class WriteContext + { + public WriteContext(SocketOutput self) + { + Self = self; + + WriteReq = new UvWriteReq(); + WriteReq.Init(self._thread.Loop); + + Operations = new List(); + } + + public SocketOutput Self; + + public UvWriteReq WriteReq; + public List Operations; + } } }