Don't use QueueUserWorkItem to trigger write callbacks immediately
- In this case we are off the event loop, so we can invoke the callback directly. - Increase _maxBytesBufferedBeforeThrottling
This commit is contained in:
parent
74fa82bca7
commit
c345849707
|
|
@ -11,12 +11,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
public class SocketOutput : ISocketOutput
|
||||
{
|
||||
private const int _maxPendingWrites = 3;
|
||||
private const int _maxBytesBufferedBeforeThrottling = 65536 / 8;
|
||||
private const int _maxBytesBufferedBeforeThrottling = 65536;
|
||||
|
||||
private readonly KestrelThread _thread;
|
||||
private readonly UvStreamHandle _socket;
|
||||
|
||||
// This locks all access to to all the below
|
||||
// This locks access to to all of the below fields
|
||||
private readonly object _lockObj = new object();
|
||||
|
||||
// The number of write operations that have been scheduled so far
|
||||
|
|
@ -44,17 +44,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
KestrelTrace.Log.ConnectionWrite(0, buffer.Count);
|
||||
|
||||
var writeOp = new WriteOperation
|
||||
{
|
||||
Buffer = buffer
|
||||
};
|
||||
|
||||
var callbackContext = new CallbackContext
|
||||
{
|
||||
Callback = callback,
|
||||
State = state,
|
||||
BytesToWrite = buffer.Count
|
||||
};
|
||||
bool triggerCallbackNow = false;
|
||||
|
||||
lock (_lockObj)
|
||||
{
|
||||
|
|
@ -63,20 +53,22 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
_nextWriteContext = new WriteContext(this);
|
||||
}
|
||||
|
||||
_nextWriteContext.Operations.Enqueue(writeOp);
|
||||
_nextWriteContext.Buffers.Enqueue(buffer);
|
||||
_numBytesBuffered += buffer.Count;
|
||||
|
||||
// Complete the write task immediately if all previous write tasks have been completed,
|
||||
// the buffers haven't grown too large, and the last write to the socket succeeded.
|
||||
if (_lastWriteError == null &&
|
||||
_callbacksPending.Count == 0 &&
|
||||
_numBytesBuffered < _maxBytesBufferedBeforeThrottling)
|
||||
triggerCallbackNow = _lastWriteError == null &&
|
||||
_callbacksPending.Count == 0 &&
|
||||
_numBytesBuffered <= _maxBytesBufferedBeforeThrottling;
|
||||
if (!triggerCallbackNow)
|
||||
{
|
||||
TriggerCallback(callbackContext);
|
||||
}
|
||||
else
|
||||
{
|
||||
_callbacksPending.Enqueue(callbackContext);
|
||||
_callbacksPending.Enqueue(new CallbackContext
|
||||
{
|
||||
Callback = callback,
|
||||
State = state,
|
||||
BytesToWrite = buffer.Count
|
||||
});
|
||||
}
|
||||
|
||||
if (_writesPending < _maxPendingWrites)
|
||||
|
|
@ -85,6 +77,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
_writesPending++;
|
||||
}
|
||||
}
|
||||
|
||||
if (triggerCallbackNow)
|
||||
{
|
||||
callback(null, state);
|
||||
}
|
||||
}
|
||||
|
||||
private void ScheduleWrite()
|
||||
|
|
@ -117,19 +114,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
try
|
||||
{
|
||||
var buffers = new ArraySegment<byte>[writingContext.Operations.Count];
|
||||
var buffers = new ArraySegment<byte>[writingContext.Buffers.Count];
|
||||
|
||||
var i = 0;
|
||||
foreach (var writeOp in writingContext.Operations)
|
||||
foreach (var buffer in writingContext.Buffers)
|
||||
{
|
||||
buffers[i] = writeOp.Buffer;
|
||||
i++;
|
||||
buffers[i++] = buffer;
|
||||
}
|
||||
|
||||
writingContext.WriteReq.Write(_socket, new ArraySegment<ArraySegment<byte>>(buffers), (r, status, error, state) =>
|
||||
var writeReq = new UvWriteReq();
|
||||
writeReq.Init(_thread.Loop);
|
||||
|
||||
writeReq.Write(_socket, new ArraySegment<ArraySegment<byte>>(buffers), (r, status, error, state) =>
|
||||
{
|
||||
var writtenContext = (WriteContext)state;
|
||||
writtenContext.Self.OnWriteCompleted(writtenContext.Operations, r, status, error);
|
||||
writtenContext.Self.OnWriteCompleted(writtenContext.Buffers, r, status, error);
|
||||
}, writingContext);
|
||||
}
|
||||
catch
|
||||
|
|
@ -146,8 +145,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
}
|
||||
|
||||
// This is called on the libuv event loop
|
||||
private void OnWriteCompleted(Queue<WriteOperation> completedWrites, UvWriteReq req, int status, Exception error)
|
||||
private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, UvWriteReq req, int status, Exception error)
|
||||
{
|
||||
KestrelTrace.Log.ConnectionWriteCallback(0, status);
|
||||
|
||||
lock (_lockObj)
|
||||
{
|
||||
_lastWriteError = error;
|
||||
|
|
@ -161,16 +162,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
_writesPending--;
|
||||
}
|
||||
|
||||
foreach (var writeOp in completedWrites)
|
||||
foreach (var writeBuffer in writtenBuffers)
|
||||
{
|
||||
_numBytesBuffered -= writeOp.Buffer.Count;
|
||||
_numBytesBuffered -= writeBuffer.Count;
|
||||
}
|
||||
|
||||
var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered;
|
||||
while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite < bytesLeftToBuffer)
|
||||
while (_callbacksPending.Count > 0 &&
|
||||
_callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer)
|
||||
{
|
||||
var context = _callbacksPending.Dequeue();
|
||||
TriggerCallback(context);
|
||||
TriggerCallback(_callbacksPending.Dequeue());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -195,27 +196,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
public int BytesToWrite;
|
||||
}
|
||||
|
||||
private class WriteOperation
|
||||
{
|
||||
public ArraySegment<byte> Buffer;
|
||||
}
|
||||
|
||||
private class WriteContext
|
||||
{
|
||||
public WriteContext(SocketOutput self)
|
||||
{
|
||||
Self = self;
|
||||
|
||||
WriteReq = new UvWriteReq();
|
||||
WriteReq.Init(self._thread.Loop);
|
||||
|
||||
Operations = new Queue<WriteOperation>();
|
||||
Buffers = new Queue<ArraySegment<byte>>();
|
||||
}
|
||||
|
||||
public SocketOutput Self;
|
||||
|
||||
public UvWriteReq WriteReq;
|
||||
public Queue<WriteOperation> Operations;
|
||||
public Queue<ArraySegment<byte>> Buffers;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue