Merge branch 'benaadams/reduce-sync-over-async' into dev

This commit is contained in:
Stephen Halter 2016-01-19 16:35:51 -08:00
commit 8ea24f05f4
2 changed files with 56 additions and 29 deletions

View File

@ -464,14 +464,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
SocketOutput.Write(data, immediate: false, chunk: true); SocketOutput.Write(data, immediate: false, chunk: true);
} }
private async Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken) private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{ {
await SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken); return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
} }
private void WriteChunkedResponseSuffix() private Task WriteChunkedResponseSuffix()
{ {
SocketOutput.Write(_endChunkedResponseBytes, immediate: true); return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true);
} }
private static ArraySegment<byte> CreateAsciiByteArraySegment(string text) private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
@ -562,31 +562,41 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return ProduceEndAwaited(); return ProduceEndAwaited();
} }
WriteSuffix(); return WriteSuffix();
return TaskUtilities.CompletedTask;
} }
private async Task ProduceEndAwaited() private async Task ProduceEndAwaited()
{ {
await ProduceStart(immediate: true, appCompleted: true); await ProduceStart(immediate: true, appCompleted: true);
WriteSuffix(); await WriteSuffix();
} }
private void WriteSuffix() private Task WriteSuffix()
{ {
// _autoChunk should be checked after we are sure ProduceStart() has been called // _autoChunk should be checked after we are sure ProduceStart() has been called
// since ProduceStart() may set _autoChunk to true. // since ProduceStart() may set _autoChunk to true.
if (_autoChunk) if (_autoChunk)
{ {
WriteChunkedResponseSuffix(); return WriteAutoChunkSuffixAwaited();
} }
if (_keepAlive) if (_keepAlive)
{ {
ConnectionControl.End(ProduceEndType.ConnectionKeepAlive); ConnectionControl.End(ProduceEndType.ConnectionKeepAlive);
} }
return TaskUtilities.CompletedTask;
}
private async Task WriteAutoChunkSuffixAwaited()
{
await WriteChunkedResponseSuffix();
if (_keepAlive)
{
ConnectionControl.End(ProduceEndType.ConnectionKeepAlive);
}
} }
private Task CreateResponseHeader( private Task CreateResponseHeader(

View File

@ -48,7 +48,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private int _numBytesPreCompleted = 0; private int _numBytesPreCompleted = 0;
private Exception _lastWriteError; private Exception _lastWriteError;
private WriteContext _nextWriteContext; private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending; private readonly Queue<WaitingTask> _tasksPending;
private readonly Queue<WriteContext> _writeContextPool; private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool; private readonly Queue<UvWriteReq> _writeReqPool;
@ -68,7 +68,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_connectionId = connectionId; _connectionId = connectionId;
_log = log; _log = log;
_threadPool = threadPool; _threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues); _tasksPending = new Queue<WaitingTask>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts); _writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool; _writeReqPool = writeReqPool;
@ -81,7 +81,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
bool immediate = true, bool immediate = true,
bool chunk = false, bool chunk = false,
bool socketShutdownSend = false, bool socketShutdownSend = false,
bool socketDisconnect = false) bool socketDisconnect = false,
bool isSync = false)
{ {
TaskCompletionSource<object> tcs = null; TaskCompletionSource<object> tcs = null;
var scheduleWrite = false; var scheduleWrite = false;
@ -147,7 +148,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{ {
// immediate write, which is not eligable for instant completion above // immediate write, which is not eligable for instant completion above
tcs = new TaskCompletionSource<object>(buffer.Count); tcs = new TaskCompletionSource<object>(buffer.Count);
_tasksPending.Enqueue(tcs); _tasksPending.Enqueue(new WaitingTask() {
CompletionSource = tcs,
BytesToWrite = buffer.Count,
IsSync = isSync
});
} }
if (!_writePending && immediate) if (!_writePending && immediate)
@ -316,21 +321,35 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// This allows large writes to complete once they've actually finished. // This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
while (_tasksPending.Count > 0 && while (_tasksPending.Count > 0 &&
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer) (_tasksPending.Peek().BytesToWrite) <= bytesLeftToBuffer)
{ {
var tcs = _tasksPending.Dequeue(); var waitingTask = _tasksPending.Dequeue();
var bytesToWrite = (int)tcs.Task.AsyncState; var bytesToWrite = waitingTask.BytesToWrite;
_numBytesPreCompleted += bytesToWrite; _numBytesPreCompleted += bytesToWrite;
bytesLeftToBuffer -= bytesToWrite; bytesLeftToBuffer -= bytesToWrite;
if (_lastWriteError == null) if (_lastWriteError == null)
{ {
_threadPool.Complete(tcs); if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetResult(null);
}
else
{
_threadPool.Complete(waitingTask.CompletionSource);
}
} }
else else
{ {
_threadPool.Error(tcs, _lastWriteError); if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetException(_lastWriteError);
}
else
{
_threadPool.Error(waitingTask.CompletionSource, _lastWriteError);
}
} }
} }
@ -374,16 +393,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk) void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
{ {
var task = WriteAsync(buffer, immediate, chunk); WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult();
if (task.Status == TaskStatus.RanToCompletion)
{
return;
}
else
{
task.GetAwaiter().GetResult();
}
} }
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken) Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
@ -634,5 +644,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ShutdownSendStatus = 0; ShutdownSendStatus = 0;
} }
} }
private struct WaitingTask
{
public bool IsSync;
public int BytesToWrite;
public TaskCompletionSource<object> CompletionSource;
}
} }
} }