Complete WriteAsync Tasks early when there are less than 64KB buffered
This commit is contained in:
parent
5b06a76367
commit
74fa82bca7
|
|
@ -11,23 +11,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
public class SocketOutput : ISocketOutput
|
||||
{
|
||||
private const int _maxPendingWrites = 3;
|
||||
private const int _maxBytesBufferedBeforeThrottling = 65536 / 8;
|
||||
|
||||
private readonly KestrelThread _thread;
|
||||
private readonly UvStreamHandle _socket;
|
||||
|
||||
private WriteContext _nextWriteContext;
|
||||
// This locks all access to to all the below
|
||||
private readonly object _lockObj = new object();
|
||||
|
||||
// The number of write operations that have been scheduled so far
|
||||
// but have not completed.
|
||||
private int _writesSending = 0;
|
||||
private int _writesPending = 0;
|
||||
|
||||
// This locks all access to _nextWriteContext and _writesSending
|
||||
private readonly object _lockObj = new object();
|
||||
private int _numBytesBuffered = 0;
|
||||
private Exception _lastWriteError;
|
||||
private WriteContext _nextWriteContext;
|
||||
private readonly Queue<CallbackContext> _callbacksPending;
|
||||
|
||||
public SocketOutput(KestrelThread thread, UvStreamHandle socket)
|
||||
{
|
||||
_thread = thread;
|
||||
_socket = socket;
|
||||
_callbacksPending = new Queue<CallbackContext>();
|
||||
}
|
||||
|
||||
public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback, object state)
|
||||
|
|
@ -39,11 +44,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
KestrelTrace.Log.ConnectionWrite(0, buffer.Count);
|
||||
|
||||
var context = new WriteOperation
|
||||
var writeOp = new WriteOperation
|
||||
{
|
||||
Buffer = buffer
|
||||
};
|
||||
|
||||
var callbackContext = new CallbackContext
|
||||
{
|
||||
Buffer = buffer,
|
||||
Callback = callback,
|
||||
State = state
|
||||
State = state,
|
||||
BytesToWrite = buffer.Count
|
||||
};
|
||||
|
||||
lock (_lockObj)
|
||||
|
|
@ -53,12 +63,26 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
_nextWriteContext = new WriteContext(this);
|
||||
}
|
||||
|
||||
_nextWriteContext.Operations.Add(context);
|
||||
_nextWriteContext.Operations.Enqueue(writeOp);
|
||||
_numBytesBuffered += buffer.Count;
|
||||
|
||||
if (_writesSending < _maxPendingWrites)
|
||||
// Complete the write task immediately if all previous write tasks have been completed,
|
||||
// the buffers haven't grown too large, and the last write to the socket succeeded.
|
||||
if (_lastWriteError == null &&
|
||||
_callbacksPending.Count == 0 &&
|
||||
_numBytesBuffered < _maxBytesBufferedBeforeThrottling)
|
||||
{
|
||||
TriggerCallback(callbackContext);
|
||||
}
|
||||
else
|
||||
{
|
||||
_callbacksPending.Enqueue(callbackContext);
|
||||
}
|
||||
|
||||
if (_writesPending < _maxPendingWrites)
|
||||
{
|
||||
ScheduleWrite();
|
||||
_writesSending++;
|
||||
_writesPending++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -86,7 +110,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
}
|
||||
else
|
||||
{
|
||||
_writesSending--;
|
||||
_writesPending--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -114,7 +138,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
{
|
||||
// Lock instead of using Interlocked.Decrement so _writesSending
|
||||
// doesn't change in the middle of executing other synchronized code.
|
||||
_writesSending--;
|
||||
_writesPending--;
|
||||
}
|
||||
|
||||
throw;
|
||||
|
|
@ -122,43 +146,58 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
}
|
||||
|
||||
// This is called on the libuv event loop
|
||||
private void OnWriteCompleted(List<WriteOperation> completedWrites, UvWriteReq req, int status, Exception error)
|
||||
private void OnWriteCompleted(Queue<WriteOperation> completedWrites, UvWriteReq req, int status, Exception error)
|
||||
{
|
||||
lock (_lockObj)
|
||||
{
|
||||
_lastWriteError = error;
|
||||
|
||||
if (_nextWriteContext != null)
|
||||
{
|
||||
ScheduleWrite();
|
||||
}
|
||||
else
|
||||
{
|
||||
_writesSending--;
|
||||
_writesPending--;
|
||||
}
|
||||
|
||||
foreach (var writeOp in completedWrites)
|
||||
{
|
||||
_numBytesBuffered -= writeOp.Buffer.Count;
|
||||
}
|
||||
|
||||
var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered;
|
||||
while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite < bytesLeftToBuffer)
|
||||
{
|
||||
var context = _callbacksPending.Dequeue();
|
||||
TriggerCallback(context);
|
||||
}
|
||||
}
|
||||
|
||||
req.Dispose();
|
||||
}
|
||||
|
||||
foreach (var writeOp in completedWrites)
|
||||
private void TriggerCallback(CallbackContext context)
|
||||
{
|
||||
context.Error = _lastWriteError;
|
||||
ThreadPool.QueueUserWorkItem(obj =>
|
||||
{
|
||||
KestrelTrace.Log.ConnectionWriteCallback(0, status);
|
||||
//NOTE: pool this?
|
||||
var c = (CallbackContext)obj;
|
||||
c.Callback(c.Error, c.State);
|
||||
}, context);
|
||||
}
|
||||
|
||||
// Get off the event loop before calling user code!
|
||||
writeOp.Error = error;
|
||||
ThreadPool.QueueUserWorkItem(obj =>
|
||||
{
|
||||
var op = (WriteOperation)obj;
|
||||
op.Callback(op.Error, op.State);
|
||||
}, writeOp);
|
||||
}
|
||||
private class CallbackContext
|
||||
{
|
||||
public Exception Error;
|
||||
public Action<Exception, object> Callback;
|
||||
public object State;
|
||||
public int BytesToWrite;
|
||||
}
|
||||
|
||||
private class WriteOperation
|
||||
{
|
||||
public ArraySegment<byte> Buffer;
|
||||
public Exception Error;
|
||||
public Action<Exception, object> Callback;
|
||||
public object State;
|
||||
}
|
||||
|
||||
private class WriteContext
|
||||
|
|
@ -170,13 +209,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
WriteReq = new UvWriteReq();
|
||||
WriteReq.Init(self._thread.Loop);
|
||||
|
||||
Operations = new List<WriteOperation>();
|
||||
Operations = new Queue<WriteOperation>();
|
||||
}
|
||||
|
||||
public SocketOutput Self;
|
||||
|
||||
public UvWriteReq WriteReq;
|
||||
public List<WriteOperation> Operations;
|
||||
public Queue<WriteOperation> Operations;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue