diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index d0d218ac5a..d65c93077f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -31,7 +31,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private int _numBytesPreCompleted = 0; private Exception _lastWriteError; private WriteContext _nextWriteContext; - private readonly Queue _callbacksPending; + private readonly Queue> _tasksPending; public SocketOutput(KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log) { @@ -39,13 +39,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _socket = socket; _connectionId = connectionId; _log = log; - _callbacksPending = new Queue(); + _tasksPending = new Queue>(); } - public void Write( + public Task WriteAsync( ArraySegment buffer, - Action callback, - object state, bool immediate = true, bool socketShutdownSend = false, bool socketDisconnect = false) @@ -58,8 +56,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffer = new ArraySegment(copy); _log.ConnectionWrite(_connectionId, buffer.Count); } - - bool triggerCallbackNow = false; + + TaskCompletionSource tcs = null; lock (_lockObj) { @@ -80,23 +78,25 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { _nextWriteContext.SocketDisconnect = true; } + // Complete the write task immediately if all previous write tasks have been completed, // the buffers haven't grown too large, and the last write to the socket succeeded. - triggerCallbackNow = _lastWriteError == null && - _callbacksPending.Count == 0 && - _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted; - if (triggerCallbackNow) + if (_lastWriteError == null && + _tasksPending.Count == 0 && + _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted) { _numBytesPreCompleted += buffer.Count; } + else if (immediate) + { + // immediate write, which is not eligable for instant completion above + tcs = new TaskCompletionSource(buffer.Count); + _tasksPending.Enqueue(tcs); + } else { - _callbacksPending.Enqueue(new CallbackContext - { - Callback = callback, - State = state, - BytesToWrite = buffer.Count - }); + // immediate==false calls always return complete tasks, because there is guaranteed + // to be a subsequent immediate==true call which will go down one of the previous code-paths } if (_writesPending < _maxPendingWrites && immediate) @@ -106,12 +106,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } - // Make sure we call user code outside of the lock. - if (triggerCallbackNow) - { - // callback(error, state, calledInline) - callback(null, state, true); - } + // Return TaskCompletionSource's Task if set, otherwise completed Task + return tcs?.Task ?? TaskUtilities.CompletedTask; } public void End(ProduceEndType endType) @@ -119,13 +115,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http switch (endType) { case ProduceEndType.SocketShutdownSend: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + WriteAsync(default(ArraySegment), immediate: true, socketShutdownSend: true, socketDisconnect: false); break; case ProduceEndType.SocketDisconnect: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + WriteAsync(default(ArraySegment), immediate: true, socketShutdownSend: false, socketDisconnect: true); @@ -198,20 +194,30 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // completed writes that we haven't triggered callbacks for yet. _numBytesPreCompleted -= writeBuffer.Count; } - - + // bytesLeftToBuffer can be greater than _maxBytesPreCompleted // This allows large writes to complete once they've actually finished. var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; - while (_callbacksPending.Count > 0 && - _callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer) + while (_tasksPending.Count > 0 && + (int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer) { - var callbackContext = _callbacksPending.Dequeue(); + var tcs = _tasksPending.Dequeue(); - _numBytesPreCompleted += callbackContext.BytesToWrite; + _numBytesPreCompleted += (int)(tcs.Task.AsyncState); - // callback(error, state, calledInline) - callbackContext.Callback(_lastWriteError, callbackContext.State, false); + if (error == null) + { + ThreadPool.QueueUserWorkItem( + (o) => ((TaskCompletionSource)o).SetResult(null), + tcs); + } + else + { + // error is closure captured + ThreadPool.QueueUserWorkItem( + (o) => ((TaskCompletionSource)o).SetException(error), + tcs); + } } // Now that the while loop has completed the following invariants should hold true: @@ -222,104 +228,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http void ISocketOutput.Write(ArraySegment buffer, bool immediate) { - if (!immediate) + var task = WriteAsync(buffer, immediate); + + if (task.Status == TaskStatus.RanToCompletion) { - // immediate==false calls always return complete tasks, because there is guaranteed - // to be a subsequent immediate==true call which will go down the following code-path - Write( - buffer, - (error, state, calledInline) => { }, - null, - immediate: false); return; } - - // TODO: Optimize task being used, and remove callback model from the underlying Write - var tcs = new TaskCompletionSource(); - - Write( - buffer, - (error, state, calledInline) => - { - if (error != null) - { - tcs.SetException(error); - } - else - { - tcs.SetResult(0); - } - }, - tcs, - immediate: true); - - if (tcs.Task.Status != TaskStatus.RanToCompletion) + else { - tcs.Task.GetAwaiter().GetResult(); + task.GetAwaiter().GetResult(); } } Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, CancellationToken cancellationToken) { - if (!immediate) - { - // immediate==false calls always return complete tasks, because there is guaranteed - // to be a subsequent immediate==true call which will go down the following code-path - Write( - buffer, - (error, state, calledInline) => { }, - null, - immediate: false); - return TaskUtilities.CompletedTask; - } - - // TODO: Optimize task being used, and remove callback model from the underlying Write - var tcs = new TaskCompletionSource(); - - Write( - buffer, - (error, state, calledInline) => - { - if (!calledInline) - { - ThreadPool.QueueUserWorkItem(state2 => - { - var tcs2 = (TaskCompletionSource)state2; - if (error != null) - { - tcs2.SetException(error); - } - else - { - tcs2.SetResult(0); - } - }, state); - } - else - { - var tcs2 = (TaskCompletionSource)state; - if (error != null) - { - tcs2.SetException(error); - } - else - { - tcs2.SetResult(0); - } - } - }, - tcs, - immediate: true); - - return tcs.Task; - } - - private class CallbackContext - { - // callback(error, state, calledInline) - public Action Callback; - public object State; - public int BytesToWrite; + return WriteAsync(buffer, immediate); } private class WriteContext @@ -341,15 +264,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http Buffers = new Queue>(); } - /// - /// Perform any actions needed by this work item. The individual tasks are non-blocking and - /// will continue through to each other in order. - /// - public void Execute() - { - DoWriteIfNeeded(); - } - /// /// First step: initiate async write if needed, otherwise go to next step /// diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 8e0425ce52..4f697ea39f 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Infrastructure; @@ -45,15 +46,15 @@ namespace Microsoft.AspNet.Server.KestrelTests var bufferSize = 1048576; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => - { - Assert.Null(ex); - Assert.Null(state); - completedWh.Set(); - }; // Act - socketOutput.Write(buffer, onCompleted, null); + socketOutput.WriteAsync(buffer).ContinueWith( + (t) => + { + Assert.Null(t.Exception); + completedWh.Set(); + } + ); // Assert Assert.True(completedWh.Wait(1000)); @@ -89,22 +90,21 @@ namespace Microsoft.AspNet.Server.KestrelTests var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => + Action onCompleted = (Task t) => { - Assert.Null(ex); - Assert.Null(state); + Assert.Null(t.Exception); completedWh.Set(); }; // Act - socketOutput.Write(buffer, onCompleted, null); + socketOutput.WriteAsync(buffer).ContinueWith(onCompleted); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. Assert.True(completedWh.Wait(1000)); // Arrange completedWh.Reset(); // Act - socketOutput.Write(buffer, onCompleted, null); + socketOutput.WriteAsync(buffer).ContinueWith(onCompleted); // Assert // Too many bytes are already pre-completed for the second write to pre-complete. Assert.False(completedWh.Wait(1000));