Reduce calls to uv_write by calling it with multiple buffers when possible

This commit is contained in:
Stephen Halter 2015-07-08 11:50:11 -07:00
parent 166ec72484
commit d723f9da21
1 changed files with 131 additions and 45 deletions

View File

@ -3,15 +3,27 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Threading;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketOutput : ISocketOutput
{
private const int _maxPendingWrites = 3;
private readonly KestrelThread _thread;
private readonly UvStreamHandle _socket;
private WriteContext _nextWriteContext;
// The number of write operations that have been scheduled so far
// but have not completed.
private int _writesSending = 0;
// This locks all access to _nextWriteContext and _writesSending
private readonly object _lockObj = new object();
public SocketOutput(KestrelThread thread, UvStreamHandle socket)
{
_thread = thread;
@ -26,71 +38,145 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
buffer = new ArraySegment<byte>(copy);
KestrelTrace.Log.ConnectionWrite(0, buffer.Count);
var req = new ThisWriteReq();
req.Init(_thread.Loop);
req.Contextualize(this, _socket, buffer, callback, state);
req.Write();
var context = new WriteOperation
{
Buffer = buffer,
Callback = callback,
State = state
};
lock (_lockObj)
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
}
_nextWriteContext.Operations.Add(context);
if (_writesSending < _maxPendingWrites)
{
ScheduleWrite();
_writesSending++;
}
}
}
public class ThisWriteReq : UvWriteReq
private void ScheduleWrite()
{
SocketOutput _self;
ArraySegment<byte> _buffer;
UvStreamHandle _socket;
Action<Exception, object> _callback;
object _state;
Exception _callbackError;
internal void Contextualize(
SocketOutput socketOutput,
UvStreamHandle socket,
ArraySegment<byte> buffer,
Action<Exception, object> callback,
object state)
_thread.Post(obj =>
{
_self = socketOutput;
_socket = socket;
_buffer = buffer;
_callback = callback;
_state = state;
}
var self = (SocketOutput)obj;
self.WriteAllPending();
}, this);
}
public void Write()
// This is called on the libuv event loop
private void WriteAllPending()
{
WriteContext writingContext;
lock (_lockObj)
{
_self._thread.Post(obj =>
if (_nextWriteContext != null)
{
var req = (ThisWriteReq)obj;
req.Write(
req._socket,
new ArraySegment<ArraySegment<byte>>(
new[] { req._buffer }),
(r, status, error, state) => ((ThisWriteReq)state).OnWrite(status, error),
req);
}, this);
writingContext = _nextWriteContext;
_nextWriteContext = null;
}
else
{
_writesSending--;
return;
}
}
private void OnWrite(int status, Exception error)
try
{
var buffers = new ArraySegment<byte>[writingContext.Operations.Count];
var i = 0;
foreach (var writeOp in writingContext.Operations)
{
buffers[i] = writeOp.Buffer;
i++;
}
writingContext.WriteReq.Write(_socket, new ArraySegment<ArraySegment<byte>>(buffers), (r, status, error, state) =>
{
var writtenContext = (WriteContext)state;
writtenContext.Self.OnWriteCompleted(writtenContext.Operations, r, status, error);
}, writingContext);
}
catch
{
lock (_lockObj)
{
// Lock instead of using Interlocked.Decrement so _writesSending
// doesn't change in the middle of executing other synchronized code.
_writesSending--;
}
throw;
}
}
// This is called on the libuv event loop
private void OnWriteCompleted(List<WriteOperation> completedWrites, UvWriteReq req, int status, Exception error)
{
lock (_lockObj)
{
if (_nextWriteContext != null)
{
ScheduleWrite();
}
else
{
_writesSending--;
}
}
req.Dispose();
foreach (var writeOp in completedWrites)
{
KestrelTrace.Log.ConnectionWriteCallback(0, status);
//NOTE: pool this?
Dispose();
// Get off the event loop before calling user code!
_callbackError = error;
writeOp.Error = error;
ThreadPool.QueueUserWorkItem(obj =>
{
var req = (ThisWriteReq)obj;
req._callback(req._callbackError, req._state);
}, this);
}
var op = (WriteOperation)obj;
op.Callback(op.Error, op.State);
}, writeOp);
}
}
public bool Flush(Action drained)
private class WriteOperation
{
return false;
public ArraySegment<byte> Buffer;
public Exception Error;
public Action<Exception, object> Callback;
public object State;
}
private class WriteContext
{
public WriteContext(SocketOutput self)
{
Self = self;
WriteReq = new UvWriteReq();
WriteReq.Init(self._thread.Loop);
Operations = new List<WriteOperation>();
}
public SocketOutput Self;
public UvWriteReq WriteReq;
public List<WriteOperation> Operations;
}
}
}