Update SocketOutput to not call QueueUserWorkItem unnecessarily

This commit is contained in:
Stephen Halter 2015-09-21 16:14:19 -07:00 committed by Louis DeJardin
parent 789d5b3595
commit dc902f5fc4
2 changed files with 72 additions and 31 deletions

View File

@ -44,7 +44,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Write(
ArraySegment<byte> buffer,
Action<Exception, object> callback,
Action<Exception, object, bool> callback,
object state,
bool immediate = true,
bool socketShutdownSend = false,
@ -109,7 +109,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// Make sure we call user code outside of the lock.
if (triggerCallbackNow)
{
callback(null, state);
// callback(error, state, calledInline)
callback(null, state, true);
}
}
@ -190,7 +191,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_numBytesPreCompleted += callbackContext.BytesToWrite;
TriggerCallback(callbackContext);
// callback(error, state, calledInline)
callbackContext.Callback(_lastWriteError, callbackContext.State, false);
}
// Now that the while loop has completed the following invariants should hold true:
@ -199,22 +201,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
private void TriggerCallback(CallbackContext context)
{
context.Error = _lastWriteError;
ThreadPool.QueueUserWorkItem(obj =>
{
var c = (CallbackContext)obj;
c.Callback(c.Error, c.State);
}, context);
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
((ISocketOutput)this).WriteAsync(buffer, immediate).GetAwaiter().GetResult();
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, CancellationToken cancellationToken)
{
if (!immediate)
{
@ -222,10 +209,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// to be a subsequent immediate==true call which will go down the following code-path
Write(
buffer,
(error, state) => { },
(error, state, calledInline) => { },
null,
immediate: false);
return TaskUtilities.CompletedTask;
return;
}
// TODO: Optimize task being used, and remove callback model from the underlying Write
@ -233,7 +220,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Write(
buffer,
(error, state) =>
(error, state, calledInline) =>
{
if (error != null)
{
@ -247,6 +234,64 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
tcs,
immediate: true);
if (tcs.Task.Status != TaskStatus.RanToCompletion)
{
tcs.Task.GetAwaiter().GetResult();
}
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, CancellationToken cancellationToken)
{
if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down the following code-path
Write(
buffer,
(error, state, calledInline) => { },
null,
immediate: false);
return TaskUtilities.CompletedTask;
}
// TODO: Optimize task being used, and remove callback model from the underlying Write
var tcs = new TaskCompletionSource<int>();
Write(
buffer,
(error, state, calledInline) =>
{
if (!calledInline)
{
ThreadPool.QueueUserWorkItem(state2 =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (error != null)
{
tcs2.SetException(error);
}
else
{
tcs2.SetResult(0);
}
}, state);
}
else
{
var tcs2 = (TaskCompletionSource<int>)state;
if (error != null)
{
tcs2.SetException(error);
}
else
{
tcs2.SetResult(0);
}
}
},
tcs,
immediate: true);
return tcs.Task;
}
@ -255,13 +300,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
switch (endType)
{
case ProduceEndType.SocketShutdownSend:
Write(default(ArraySegment<byte>), (error, state) => { }, null,
Write(default(ArraySegment<byte>), (error, state, calledInline) => { }, null,
immediate: true,
socketShutdownSend: true,
socketDisconnect: false);
break;
case ProduceEndType.SocketDisconnect:
Write(default(ArraySegment<byte>), (error, state) => { }, null,
Write(default(ArraySegment<byte>), (error, state, calledInline) => { }, null,
immediate: true,
socketShutdownSend: false,
socketDisconnect: true);
@ -271,8 +316,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private class CallbackContext
{
public Exception Error;
public Action<Exception, object> Callback;
// callback(error, state, calledInline)
public Action<Exception, object, bool> Callback;
public object State;
public int BytesToWrite;
}
@ -328,10 +373,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
writeReq.Init(Self._thread.Loop);
writeReq.Write(Self._socket, new ArraySegment<ArraySegment<byte>>(buffers), (_writeReq, status, error, state) =>
{
if (error != null)
{
var x = 5;
}
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.WriteStatus = status;

View File

@ -45,7 +45,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var bufferSize = 1048576;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
var completedWh = new ManualResetEventSlim();
Action<Exception, object> onCompleted = (ex, state) =>
Action<Exception, object, bool> onCompleted = (ex, state, calledInline) =>
{
Assert.Null(ex);
Assert.Null(state);
@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
var completedWh = new ManualResetEventSlim();
Action<Exception, object> onCompleted = (ex, state) =>
Action<Exception, object, bool> onCompleted = (ex, state, calledInline) =>
{
Assert.Null(ex);
Assert.Null(state);