diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs index e3d2eb5603..3266d88d64 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -19,9 +19,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter public FilteredStreamAdapter( Stream filteredStream, MemoryPool2 memory, - IKestrelTrace logger) + IKestrelTrace logger, + ThreadPoolActions threadPoolActions) { - SocketInput = new SocketInput(memory); + SocketInput = new SocketInput(memory, threadPoolActions); SocketOutput = new StreamSocketOutput(filteredStream, memory); _log = logger; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index c02e295dd3..dd7236f523 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -41,8 +41,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); - _rawSocketInput = new SocketInput(Memory2); - _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log); + _rawSocketInput = new SocketInput(Memory2, ThreadPoolActions); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPoolActions); } public void Start() @@ -124,7 +124,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private void ApplyConnectionFilter() { - var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log); + var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPoolActions); SocketInput = filteredStreamAdapter.SocketInput; SocketOutput = filteredStreamAdapter.SocketOutput; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index b13d678130..a85634331d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -16,6 +16,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static readonly Action _awaitableIsNotCompleted = () => { }; private readonly MemoryPool2 _memory; + private readonly ThreadPoolActions _threadPoolActions; private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false); private Action _awaitableState; @@ -26,9 +27,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private MemoryPoolBlock2 _pinned; private readonly object _sync = new Object(); - public SocketInput(MemoryPool2 memory) + public SocketInput(MemoryPool2 memory, ThreadPoolActions threadPoolActions) { _memory = memory; + _threadPoolActions = threadPoolActions; _awaitableState = _awaitableIsNotCompleted; } @@ -128,7 +130,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http if (awaitableState != _awaitableIsCompleted && awaitableState != _awaitableIsNotCompleted) { - ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), awaitableState); + _threadPoolActions.Run(awaitableState); } } @@ -188,7 +190,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http if (awaitableState != _awaitableIsCompleted && awaitableState != _awaitableIsNotCompleted) { - Task.Run(awaitableState); + _threadPoolActions.Run(awaitableState); } } @@ -210,7 +212,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else if (awaitableState == _awaitableIsCompleted) { - ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), continuation); + _threadPoolActions.Run(continuation); } else { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 911a5a9675..ef6b5e4f1f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -25,6 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private readonly Connection _connection; private readonly long _connectionId; private readonly IKestrelTrace _log; + private readonly ThreadPoolActions _threadPoolActions; // This locks all access to _tail, _isProducing and _returnFromOnProducingComplete. // _head does not require a lock, since it is only used in the ctor and uv thread. @@ -55,13 +56,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http MemoryPool2 memory, Connection connection, long connectionId, - IKestrelTrace log) + IKestrelTrace log, + ThreadPoolActions threadPoolActions) { _thread = thread; _socket = socket; _connection = connection; _connectionId = connectionId; _log = log; + _threadPoolActions = threadPoolActions; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); @@ -218,7 +221,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static void ReturnBlocks(MemoryPoolBlock2 block) { - while(block != null) + while (block != null) { var returningBlock = block; block = returningBlock.Next; @@ -319,16 +322,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var tcs = _tasksCompleted.Dequeue(); if (_lastWriteError == null) { - ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetResult(null), - tcs); + _threadPoolActions.Complete(tcs); } else { - // error is closure captured - ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetException(_lastWriteError), - tcs); + _threadPoolActions.Error(tcs, _lastWriteError); } } @@ -462,7 +460,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var _this = (WriteContext)state; _this.ShutdownSendStatus = status; - _this.Self._log.ConnectionWroteFin(Self._connectionId, status); + _this.Self._log.ConnectionWroteFin(_this.Self._connectionId, status); _this.DoDisconnectIfNeeded(); }, this); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 93c30a55dd..98c9ef3f4f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -33,12 +33,14 @@ namespace Microsoft.AspNet.Server.Kestrel private bool _initCompleted = false; private ExceptionDispatchInfo _closeError; private IKestrelTrace _log; + private ThreadPoolActions _threadPoolActions; public KestrelThread(KestrelEngine engine) { _engine = engine; _appLifetime = engine.AppLifetime; _log = engine.Log; + _threadPoolActions = engine.ThreadPoolActions; _loop = new UvLoopHandle(_log); _post = new UvAsyncHandle(_log); _thread = new Thread(ThreadStart); @@ -151,7 +153,7 @@ namespace Microsoft.AspNet.Server.Kestrel public Task PostAsync(Action callback, T state) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(); lock (_workSync) { _workAdding.Enqueue(new Work @@ -266,24 +268,14 @@ namespace Microsoft.AspNet.Server.Kestrel work.CallbackAdapter(work.Callback, work.State); if (work.Completion != null) { - ThreadPool.QueueUserWorkItem( - tcs => - { - ((TaskCompletionSource)tcs).SetResult(0); - }, - work.Completion); + _threadPoolActions.Complete(work.Completion); } } catch (Exception ex) { if (work.Completion != null) { - ThreadPool.QueueUserWorkItem( - tcs => - { - ((TaskCompletionSource)tcs).SetException(ex); - }, - work.Completion); + _threadPoolActions.Error(work.Completion, ex); } else { @@ -322,7 +314,7 @@ namespace Microsoft.AspNet.Server.Kestrel public Action CallbackAdapter; public object Callback; public object State; - public TaskCompletionSource Completion; + public TaskCompletionSource Completion; } private struct CloseHandle { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/ThreadPoolActions.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/ThreadPoolActions.cs new file mode 100644 index 0000000000..cbcba6c239 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/ThreadPoolActions.cs @@ -0,0 +1,74 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel.Http; + +namespace Microsoft.AspNet.Server.Kestrel.Infrastructure +{ + public class ThreadPoolActions + { + private readonly IKestrelTrace _log; + + private readonly WaitCallback _runAction; + private readonly WaitCallback _completeTcs; + + public ThreadPoolActions(IKestrelTrace log) + { + _log = log; + + // Curry and capture log in closures once + _runAction = (o) => + { + try + { + ((Action)o)(); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }; + + _completeTcs = (o) => + { + try + { + ((TaskCompletionSource)o).TrySetResult(null); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }; + } + + public void Run(Action action) + { + ThreadPool.QueueUserWorkItem(_runAction, action); + } + + public void Complete(TaskCompletionSource tcs) + { + ThreadPool.QueueUserWorkItem(_completeTcs, tcs); + } + + public void Error(TaskCompletionSource tcs, Exception ex) + { + // ex ang _log are closure captured + ThreadPool.QueueUserWorkItem((o) => + { + try + { + ((TaskCompletionSource)o).TrySetException(ex); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }, tcs); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs index 62d5d5d758..3be228d5a3 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs @@ -13,6 +13,8 @@ namespace Microsoft.AspNet.Server.Kestrel { public class ServiceContext { + private IKestrelTrace _log; + public ServiceContext() { } @@ -20,7 +22,8 @@ namespace Microsoft.AspNet.Server.Kestrel public ServiceContext(ServiceContext context) { AppLifetime = context.AppLifetime; - Log = context.Log; + _log = context.Log; + ThreadPoolActions = context.ThreadPoolActions; FrameFactory = context.FrameFactory; DateHeaderValueManager = context.DateHeaderValueManager; ConnectionFilter = context.ConnectionFilter; @@ -29,7 +32,20 @@ namespace Microsoft.AspNet.Server.Kestrel public IApplicationLifetime AppLifetime { get; set; } - public IKestrelTrace Log { get; set; } + public IKestrelTrace Log + { + get + { + return _log; + } + set + { + _log = value; + ThreadPoolActions = new ThreadPoolActions(_log); + } + } + + public ThreadPoolActions ThreadPoolActions { get; private set; } public Func, Frame> FrameFactory { get; set; } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs index 8266a1c2e9..b6267e384c 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using System.Text; +using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Infrastructure; using Xunit; @@ -48,7 +49,9 @@ namespace Microsoft.AspNet.Server.KestrelTests [InlineData("Connection:\r\n \r\nCookie \r\n", 1)] public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders) { - var socketInput = new SocketInput(new MemoryPool2()); + var trace = new KestrelTrace(new TestKestrelTrace()); + var tpa = new ThreadPoolActions(trace); + var socketInput = new SocketInput(new MemoryPool2(), tpa); var headerCollection = new FrameRequestHeaders(); var headerArray = Encoding.ASCII.GetBytes(rawHeaders); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 629c35407c..4028cfff1d 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -41,7 +41,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var tpa = new ThreadPoolActions(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -87,7 +88,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var tpa = new ThreadPoolActions(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -143,7 +145,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var tpa = new ThreadPoolActions(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa); var bufferSize = maxBytesPreCompleted; @@ -223,7 +226,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var tpa = new ThreadPoolActions(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -299,7 +303,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var tpa = new ThreadPoolActions(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa); // block 1 var start = socketOutput.ProducingStart(); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index a440ea0edf..2337510d32 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -4,6 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Infrastructure; @@ -13,10 +14,12 @@ namespace Microsoft.AspNet.Server.KestrelTests { public TestInput() { + var trace = new KestrelTrace(new TestKestrelTrace()); + var tpa = new ThreadPoolActions(trace); var memory2 = new MemoryPool2(); FrameContext = new FrameContext { - SocketInput = new SocketInput(memory2), + SocketInput = new SocketInput(memory2, tpa), ConnectionControl = this, FrameControl = this };