Log user code threadpool continuation execptions
This commit is contained in:
parent
10490888d3
commit
5ae9b3c383
|
|
@ -19,9 +19,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
|
|||
public FilteredStreamAdapter(
|
||||
Stream filteredStream,
|
||||
MemoryPool2 memory,
|
||||
IKestrelTrace logger)
|
||||
IKestrelTrace logger,
|
||||
ThreadPoolActions threadPoolActions)
|
||||
{
|
||||
SocketInput = new SocketInput(memory);
|
||||
SocketInput = new SocketInput(memory, threadPoolActions);
|
||||
SocketOutput = new StreamSocketOutput(filteredStream, memory);
|
||||
|
||||
_log = logger;
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
_connectionId = Interlocked.Increment(ref _lastConnectionId);
|
||||
|
||||
_rawSocketInput = new SocketInput(Memory2);
|
||||
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log);
|
||||
_rawSocketInput = new SocketInput(Memory2, ThreadPoolActions);
|
||||
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPoolActions);
|
||||
}
|
||||
|
||||
public void Start()
|
||||
|
|
@ -124,7 +124,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
private void ApplyConnectionFilter()
|
||||
{
|
||||
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log);
|
||||
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPoolActions);
|
||||
|
||||
SocketInput = filteredStreamAdapter.SocketInput;
|
||||
SocketOutput = filteredStreamAdapter.SocketOutput;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
private static readonly Action _awaitableIsNotCompleted = () => { };
|
||||
|
||||
private readonly MemoryPool2 _memory;
|
||||
private readonly ThreadPoolActions _threadPoolActions;
|
||||
private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false);
|
||||
|
||||
private Action _awaitableState;
|
||||
|
|
@ -26,9 +27,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
private MemoryPoolBlock2 _pinned;
|
||||
private readonly object _sync = new Object();
|
||||
|
||||
public SocketInput(MemoryPool2 memory)
|
||||
public SocketInput(MemoryPool2 memory, ThreadPoolActions threadPoolActions)
|
||||
{
|
||||
_memory = memory;
|
||||
_threadPoolActions = threadPoolActions;
|
||||
_awaitableState = _awaitableIsNotCompleted;
|
||||
}
|
||||
|
||||
|
|
@ -128,7 +130,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
if (awaitableState != _awaitableIsCompleted &&
|
||||
awaitableState != _awaitableIsNotCompleted)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), awaitableState);
|
||||
_threadPoolActions.Run(awaitableState);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -188,7 +190,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
if (awaitableState != _awaitableIsCompleted &&
|
||||
awaitableState != _awaitableIsNotCompleted)
|
||||
{
|
||||
Task.Run(awaitableState);
|
||||
_threadPoolActions.Run(awaitableState);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -210,7 +212,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
}
|
||||
else if (awaitableState == _awaitableIsCompleted)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), continuation);
|
||||
_threadPoolActions.Run(continuation);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
private readonly Connection _connection;
|
||||
private readonly long _connectionId;
|
||||
private readonly IKestrelTrace _log;
|
||||
private readonly ThreadPoolActions _threadPoolActions;
|
||||
|
||||
// This locks all access to _tail, _isProducing and _returnFromOnProducingComplete.
|
||||
// _head does not require a lock, since it is only used in the ctor and uv thread.
|
||||
|
|
@ -55,13 +56,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
MemoryPool2 memory,
|
||||
Connection connection,
|
||||
long connectionId,
|
||||
IKestrelTrace log)
|
||||
IKestrelTrace log,
|
||||
ThreadPoolActions threadPoolActions)
|
||||
{
|
||||
_thread = thread;
|
||||
_socket = socket;
|
||||
_connection = connection;
|
||||
_connectionId = connectionId;
|
||||
_log = log;
|
||||
_threadPoolActions = threadPoolActions;
|
||||
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
|
||||
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
|
||||
|
||||
|
|
@ -218,7 +221,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
|
||||
private static void ReturnBlocks(MemoryPoolBlock2 block)
|
||||
{
|
||||
while(block != null)
|
||||
while (block != null)
|
||||
{
|
||||
var returningBlock = block;
|
||||
block = returningBlock.Next;
|
||||
|
|
@ -319,16 +322,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
var tcs = _tasksCompleted.Dequeue();
|
||||
if (_lastWriteError == null)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(
|
||||
(o) => ((TaskCompletionSource<object>)o).SetResult(null),
|
||||
tcs);
|
||||
_threadPoolActions.Complete(tcs);
|
||||
}
|
||||
else
|
||||
{
|
||||
// error is closure captured
|
||||
ThreadPool.QueueUserWorkItem(
|
||||
(o) => ((TaskCompletionSource<object>)o).SetException(_lastWriteError),
|
||||
tcs);
|
||||
_threadPoolActions.Error(tcs, _lastWriteError);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -462,7 +460,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
|||
var _this = (WriteContext)state;
|
||||
_this.ShutdownSendStatus = status;
|
||||
|
||||
_this.Self._log.ConnectionWroteFin(Self._connectionId, status);
|
||||
_this.Self._log.ConnectionWroteFin(_this.Self._connectionId, status);
|
||||
|
||||
_this.DoDisconnectIfNeeded();
|
||||
}, this);
|
||||
|
|
|
|||
|
|
@ -33,12 +33,14 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
private bool _initCompleted = false;
|
||||
private ExceptionDispatchInfo _closeError;
|
||||
private IKestrelTrace _log;
|
||||
private ThreadPoolActions _threadPoolActions;
|
||||
|
||||
public KestrelThread(KestrelEngine engine)
|
||||
{
|
||||
_engine = engine;
|
||||
_appLifetime = engine.AppLifetime;
|
||||
_log = engine.Log;
|
||||
_threadPoolActions = engine.ThreadPoolActions;
|
||||
_loop = new UvLoopHandle(_log);
|
||||
_post = new UvAsyncHandle(_log);
|
||||
_thread = new Thread(ThreadStart);
|
||||
|
|
@ -151,7 +153,7 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
|
||||
public Task PostAsync<T>(Action<T> callback, T state)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<int>();
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
lock (_workSync)
|
||||
{
|
||||
_workAdding.Enqueue(new Work
|
||||
|
|
@ -266,24 +268,14 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
work.CallbackAdapter(work.Callback, work.State);
|
||||
if (work.Completion != null)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(
|
||||
tcs =>
|
||||
{
|
||||
((TaskCompletionSource<int>)tcs).SetResult(0);
|
||||
},
|
||||
work.Completion);
|
||||
_threadPoolActions.Complete(work.Completion);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
if (work.Completion != null)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(
|
||||
tcs =>
|
||||
{
|
||||
((TaskCompletionSource<int>)tcs).SetException(ex);
|
||||
},
|
||||
work.Completion);
|
||||
_threadPoolActions.Error(work.Completion, ex);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -322,7 +314,7 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
public Action<object, object> CallbackAdapter;
|
||||
public object Callback;
|
||||
public object State;
|
||||
public TaskCompletionSource<int> Completion;
|
||||
public TaskCompletionSource<object> Completion;
|
||||
}
|
||||
private struct CloseHandle
|
||||
{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNet.Server.Kestrel.Http;
|
||||
|
||||
namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
|
||||
{
|
||||
public class ThreadPoolActions
|
||||
{
|
||||
private readonly IKestrelTrace _log;
|
||||
|
||||
private readonly WaitCallback _runAction;
|
||||
private readonly WaitCallback _completeTcs;
|
||||
|
||||
public ThreadPoolActions(IKestrelTrace log)
|
||||
{
|
||||
_log = log;
|
||||
|
||||
// Curry and capture log in closures once
|
||||
_runAction = (o) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
((Action)o)();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.ApplicationError(e);
|
||||
}
|
||||
};
|
||||
|
||||
_completeTcs = (o) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
((TaskCompletionSource<object>)o).TrySetResult(null);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.ApplicationError(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void Run(Action action)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_runAction, action);
|
||||
}
|
||||
|
||||
public void Complete(TaskCompletionSource<object> tcs)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_completeTcs, tcs);
|
||||
}
|
||||
|
||||
public void Error(TaskCompletionSource<object> tcs, Exception ex)
|
||||
{
|
||||
// ex ang _log are closure captured
|
||||
ThreadPool.QueueUserWorkItem((o) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
((TaskCompletionSource<object>)o).TrySetException(ex);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.ApplicationError(e);
|
||||
}
|
||||
}, tcs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -13,6 +13,8 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
{
|
||||
public class ServiceContext
|
||||
{
|
||||
private IKestrelTrace _log;
|
||||
|
||||
public ServiceContext()
|
||||
{
|
||||
}
|
||||
|
|
@ -20,7 +22,8 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
public ServiceContext(ServiceContext context)
|
||||
{
|
||||
AppLifetime = context.AppLifetime;
|
||||
Log = context.Log;
|
||||
_log = context.Log;
|
||||
ThreadPoolActions = context.ThreadPoolActions;
|
||||
FrameFactory = context.FrameFactory;
|
||||
DateHeaderValueManager = context.DateHeaderValueManager;
|
||||
ConnectionFilter = context.ConnectionFilter;
|
||||
|
|
@ -29,7 +32,20 @@ namespace Microsoft.AspNet.Server.Kestrel
|
|||
|
||||
public IApplicationLifetime AppLifetime { get; set; }
|
||||
|
||||
public IKestrelTrace Log { get; set; }
|
||||
public IKestrelTrace Log
|
||||
{
|
||||
get
|
||||
{
|
||||
return _log;
|
||||
}
|
||||
set
|
||||
{
|
||||
_log = value;
|
||||
ThreadPoolActions = new ThreadPoolActions(_log);
|
||||
}
|
||||
}
|
||||
|
||||
public ThreadPoolActions ThreadPoolActions { get; private set; }
|
||||
|
||||
public Func<ConnectionContext, IPEndPoint, IPEndPoint, Action<IFeatureCollection>, Frame> FrameFactory { get; set; }
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using Microsoft.AspNet.Server.Kestrel;
|
||||
using Microsoft.AspNet.Server.Kestrel.Http;
|
||||
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
|
||||
using Xunit;
|
||||
|
|
@ -48,7 +49,9 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
[InlineData("Connection:\r\n \r\nCookie \r\n", 1)]
|
||||
public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders)
|
||||
{
|
||||
var socketInput = new SocketInput(new MemoryPool2());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketInput = new SocketInput(new MemoryPool2(), tpa);
|
||||
var headerCollection = new FrameRequestHeaders();
|
||||
|
||||
var headerArray = Encoding.ASCII.GetBytes(rawHeaders);
|
||||
|
|
|
|||
|
|
@ -41,7 +41,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace);
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
|
||||
|
||||
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
|
||||
var bufferSize = 1048576;
|
||||
|
|
@ -87,7 +88,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace);
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
|
||||
|
||||
var bufferSize = maxBytesPreCompleted;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
|
@ -143,7 +145,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace);
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
|
||||
|
||||
var bufferSize = maxBytesPreCompleted;
|
||||
|
||||
|
|
@ -223,7 +226,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace);
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
|
||||
|
||||
var bufferSize = maxBytesPreCompleted;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
|
@ -299,7 +303,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace);
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
|
||||
|
||||
// block 1
|
||||
var start = socketOutput.ProducingStart();
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNet.Server.Kestrel;
|
||||
using Microsoft.AspNet.Server.Kestrel.Http;
|
||||
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
|
||||
|
||||
|
|
@ -13,10 +14,12 @@ namespace Microsoft.AspNet.Server.KestrelTests
|
|||
{
|
||||
public TestInput()
|
||||
{
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var tpa = new ThreadPoolActions(trace);
|
||||
var memory2 = new MemoryPool2();
|
||||
FrameContext = new FrameContext
|
||||
{
|
||||
SocketInput = new SocketInput(memory2),
|
||||
SocketInput = new SocketInput(memory2, tpa),
|
||||
ConnectionControl = this,
|
||||
FrameControl = this
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in New Issue