Merge branch 'benaadams/socketoutput-tasks' into dev

This commit is contained in:
Stephen Halter 2015-11-10 15:55:10 -08:00
commit bdbd9ae7a6
2 changed files with 135 additions and 145 deletions

View File

@ -31,7 +31,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<CallbackContext> _callbacksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
public SocketOutput(KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log)
{
@ -39,13 +39,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_socket = socket;
_connectionId = connectionId;
_log = log;
_callbacksPending = new Queue<CallbackContext>();
_tasksPending = new Queue<TaskCompletionSource<object>>();
}
public void Write(
public Task WriteAsync(
ArraySegment<byte> buffer,
Action<Exception, object, bool> callback,
object state,
bool immediate = true,
bool socketShutdownSend = false,
bool socketDisconnect = false)
@ -59,7 +57,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_log.ConnectionWrite(_connectionId, buffer.Count);
}
bool triggerCallbackNow = false;
TaskCompletionSource<object> tcs = null;
lock (_lockObj)
{
@ -80,23 +78,26 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
_nextWriteContext.SocketDisconnect = true;
}
// Complete the write task immediately if all previous write tasks have been completed,
// the buffers haven't grown too large, and the last write to the socket succeeded.
triggerCallbackNow = _lastWriteError == null &&
_callbacksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted;
if (triggerCallbackNow)
if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down one of the previous code-paths
_numBytesPreCompleted += buffer.Count;
}
else if (_lastWriteError == null &&
_tasksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted)
{
// Complete the write task immediately if all previous write tasks have been completed,
// the buffers haven't grown too large, and the last write to the socket succeeded.
_numBytesPreCompleted += buffer.Count;
}
else
{
_callbacksPending.Enqueue(new CallbackContext
{
Callback = callback,
State = state,
BytesToWrite = buffer.Count
});
// immediate write, which is not eligable for instant completion above
tcs = new TaskCompletionSource<object>(buffer.Count);
_tasksPending.Enqueue(tcs);
}
if (_writesPending < _maxPendingWrites && immediate)
@ -106,12 +107,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
// Make sure we call user code outside of the lock.
if (triggerCallbackNow)
{
// callback(error, state, calledInline)
callback(null, state, true);
}
// Return TaskCompletionSource's Task if set, otherwise completed Task
return tcs?.Task ?? TaskUtilities.CompletedTask;
}
public void End(ProduceEndType endType)
@ -119,13 +116,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
switch (endType)
{
case ProduceEndType.SocketShutdownSend:
Write(default(ArraySegment<byte>), (error, state, calledInline) => { }, null,
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: true,
socketDisconnect: false);
break;
case ProduceEndType.SocketDisconnect:
Write(default(ArraySegment<byte>), (error, state, calledInline) => { }, null,
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: false,
socketDisconnect: true);
@ -198,128 +195,54 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// completed writes that we haven't triggered callbacks for yet.
_numBytesPreCompleted -= writeBuffer.Count;
}
// bytesLeftToBuffer can be greater than _maxBytesPreCompleted
// This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
while (_callbacksPending.Count > 0 &&
_callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer)
while (_tasksPending.Count > 0 &&
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer)
{
var callbackContext = _callbacksPending.Dequeue();
var tcs = _tasksPending.Dequeue();
_numBytesPreCompleted += callbackContext.BytesToWrite;
_numBytesPreCompleted += (int)(tcs.Task.AsyncState);
// callback(error, state, calledInline)
callbackContext.Callback(_lastWriteError, callbackContext.State, false);
if (error == null)
{
ThreadPool.QueueUserWorkItem(
(o) => ((TaskCompletionSource<object>)o).SetResult(null),
tcs);
}
else
{
// error is closure captured
ThreadPool.QueueUserWorkItem(
(o) => ((TaskCompletionSource<object>)o).SetException(error),
tcs);
}
}
// Now that the while loop has completed the following invariants should hold true:
Debug.Assert(_numBytesPreCompleted >= 0);
Debug.Assert(_numBytesPreCompleted <= _maxBytesPreCompleted);
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
if (!immediate)
var task = WriteAsync(buffer, immediate);
if (task.Status == TaskStatus.RanToCompletion)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down the following code-path
Write(
buffer,
(error, state, calledInline) => { },
null,
immediate: false);
return;
}
// TODO: Optimize task being used, and remove callback model from the underlying Write
var tcs = new TaskCompletionSource<int>();
Write(
buffer,
(error, state, calledInline) =>
{
if (error != null)
{
tcs.SetException(error);
}
else
{
tcs.SetResult(0);
}
},
tcs,
immediate: true);
if (tcs.Task.Status != TaskStatus.RanToCompletion)
else
{
tcs.Task.GetAwaiter().GetResult();
task.GetAwaiter().GetResult();
}
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, CancellationToken cancellationToken)
{
if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down the following code-path
Write(
buffer,
(error, state, calledInline) => { },
null,
immediate: false);
return TaskUtilities.CompletedTask;
}
// TODO: Optimize task being used, and remove callback model from the underlying Write
var tcs = new TaskCompletionSource<int>();
Write(
buffer,
(error, state, calledInline) =>
{
if (!calledInline)
{
ThreadPool.QueueUserWorkItem(state2 =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (error != null)
{
tcs2.SetException(error);
}
else
{
tcs2.SetResult(0);
}
}, state);
}
else
{
var tcs2 = (TaskCompletionSource<int>)state;
if (error != null)
{
tcs2.SetException(error);
}
else
{
tcs2.SetResult(0);
}
}
},
tcs,
immediate: true);
return tcs.Task;
}
private class CallbackContext
{
// callback(error, state, calledInline)
public Action<Exception, object, bool> Callback;
public object State;
public int BytesToWrite;
return WriteAsync(buffer, immediate);
}
private class WriteContext
@ -341,15 +264,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Buffers = new Queue<ArraySegment<byte>>();
}
/// <summary>
/// Perform any actions needed by this work item. The individual tasks are non-blocking and
/// will continue through to each other in order.
/// </summary>
public void Execute()
{
DoWriteIfNeeded();
}
/// <summary>
/// First step: initiate async write if needed, otherwise go to next step
/// </summary>

View File

@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel;
using Microsoft.AspNet.Server.Kestrel.Http;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
@ -45,15 +46,15 @@ namespace Microsoft.AspNet.Server.KestrelTests
var bufferSize = 1048576;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
var completedWh = new ManualResetEventSlim();
Action<Exception, object, bool> onCompleted = (ex, state, calledInline) =>
{
Assert.Null(ex);
Assert.Null(state);
completedWh.Set();
};
// Act
socketOutput.Write(buffer, onCompleted, null);
socketOutput.WriteAsync(buffer).ContinueWith(
(t) =>
{
Assert.Null(t.Exception);
completedWh.Set();
}
);
// Assert
Assert.True(completedWh.Wait(1000));
@ -89,22 +90,21 @@ namespace Microsoft.AspNet.Server.KestrelTests
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
var completedWh = new ManualResetEventSlim();
Action<Exception, object, bool> onCompleted = (ex, state, calledInline) =>
Action<Task> onCompleted = (Task t) =>
{
Assert.Null(ex);
Assert.Null(state);
Assert.Null(t.Exception);
completedWh.Set();
};
// Act
socketOutput.Write(buffer, onCompleted, null);
socketOutput.WriteAsync(buffer).ContinueWith(onCompleted);
// Assert
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.Write(buffer, onCompleted, null);
socketOutput.WriteAsync(buffer).ContinueWith(onCompleted);
// Assert
// Too many bytes are already pre-completed for the second write to pre-complete.
Assert.False(completedWh.Wait(1000));
@ -115,6 +115,82 @@ namespace Microsoft.AspNet.Server.KestrelTests
Assert.True(completedWh.Wait(1000));
}
}
[Fact]
public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
{
OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
}
};
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var socketOutput = new SocketOutput(kestrelThread, socket, 0, trace);
var bufferSize = maxBytesPreCompleted;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
var halfBuffer = new ArraySegment<byte>(data, 0, bufferSize / 2);
var completedWh = new ManualResetEventSlim();
Action<Task> onCompleted = (Task t) =>
{
Assert.Null(t.Exception);
completedWh.Set();
};
// Act
socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted);
// Assert
// The first write should pre-complete since it is not immediate.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted);
// Assert
// The second write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted);
// Assert
// The third write should pre-complete since it is not immediate, even though too many.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted);
// Assert
// Too many bytes are already pre-completed for the fourth write to pre-complete.
Assert.False(completedWh.Wait(1000));
// Act
while (completeQueue.Count > 0)
{
completeQueue.Dequeue()(0);
}
// Assert
// Finishing the first write should allow the second write to pre-complete.
Assert.True(completedWh.Wait(1000));
}
}
private class MockSocket : UvStreamHandle
{