diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs index e3d2eb5603..1409940c86 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -19,9 +19,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter public FilteredStreamAdapter( Stream filteredStream, MemoryPool2 memory, - IKestrelTrace logger) + IKestrelTrace logger, + IThreadPool threadPool) { - SocketInput = new SocketInput(memory); + SocketInput = new SocketInput(memory, threadPool); SocketOutput = new StreamSocketOutput(filteredStream, memory); _log = logger; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index c02e295dd3..b49ffd4316 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -41,8 +41,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); - _rawSocketInput = new SocketInput(Memory2); - _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log); + _rawSocketInput = new SocketInput(Memory2, ThreadPool); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool); } public void Start() @@ -114,7 +114,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { // Frame.Abort calls user code while this method is always // called from a libuv thread. - ThreadPool.QueueUserWorkItem(state => + System.Threading.ThreadPool.QueueUserWorkItem(state => { var connection = (Connection)state; connection._frame.Abort(); @@ -124,7 +124,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private void ApplyConnectionFilter() { - var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log); + var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool); SocketInput = filteredStreamAdapter.SocketInput; SocketOutput = filteredStreamAdapter.SocketOutput; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index b13d678130..5f850b5d68 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -16,6 +16,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static readonly Action _awaitableIsNotCompleted = () => { }; private readonly MemoryPool2 _memory; + private readonly IThreadPool _threadPool; private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false); private Action _awaitableState; @@ -26,9 +27,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private MemoryPoolBlock2 _pinned; private readonly object _sync = new Object(); - public SocketInput(MemoryPool2 memory) + public SocketInput(MemoryPool2 memory, IThreadPool threadPool) { _memory = memory; + _threadPool = threadPool; _awaitableState = _awaitableIsNotCompleted; } @@ -128,7 +130,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http if (awaitableState != _awaitableIsCompleted && awaitableState != _awaitableIsNotCompleted) { - ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), awaitableState); + _threadPool.Run(awaitableState); } } @@ -188,7 +190,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http if (awaitableState != _awaitableIsCompleted && awaitableState != _awaitableIsNotCompleted) { - Task.Run(awaitableState); + _threadPool.Run(awaitableState); } } @@ -210,7 +212,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else if (awaitableState == _awaitableIsCompleted) { - ThreadPool.QueueUserWorkItem((o) => ((Action)o)(), continuation); + _threadPool.Run(continuation); } else { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 911a5a9675..64c5808efb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -25,6 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private readonly Connection _connection; private readonly long _connectionId; private readonly IKestrelTrace _log; + private readonly IThreadPool _threadPool; // This locks all access to _tail, _isProducing and _returnFromOnProducingComplete. // _head does not require a lock, since it is only used in the ctor and uv thread. @@ -55,13 +56,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http MemoryPool2 memory, Connection connection, long connectionId, - IKestrelTrace log) + IKestrelTrace log, + IThreadPool threadPool) { _thread = thread; _socket = socket; _connection = connection; _connectionId = connectionId; _log = log; + _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); @@ -218,7 +221,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static void ReturnBlocks(MemoryPoolBlock2 block) { - while(block != null) + while (block != null) { var returningBlock = block; block = returningBlock.Next; @@ -319,16 +322,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var tcs = _tasksCompleted.Dequeue(); if (_lastWriteError == null) { - ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetResult(null), - tcs); + _threadPool.Complete(tcs); } else { - // error is closure captured - ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetException(_lastWriteError), - tcs); + _threadPool.Error(tcs, _lastWriteError); } } @@ -462,7 +460,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var _this = (WriteContext)state; _this.ShutdownSendStatus = status; - _this.Self._log.ConnectionWroteFin(Self._connectionId, status); + _this.Self._log.ConnectionWroteFin(_this.Self._connectionId, status); _this.DoDisconnectIfNeeded(); }, this); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/IThreadPool.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/IThreadPool.cs new file mode 100644 index 0000000000..fde95f4241 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/IThreadPool.cs @@ -0,0 +1,15 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Infrastructure +{ + public interface IThreadPool + { + void Complete(TaskCompletionSource tcs); + void Error(TaskCompletionSource tcs, Exception ex); + void Run(Action action); + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index a288ef49ac..ee43b9f60b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -33,12 +33,14 @@ namespace Microsoft.AspNet.Server.Kestrel private bool _initCompleted = false; private ExceptionDispatchInfo _closeError; private IKestrelTrace _log; + private IThreadPool _threadPool; public KestrelThread(KestrelEngine engine) { _engine = engine; _appLifetime = engine.AppLifetime; _log = engine.Log; + _threadPool = engine.ThreadPool; _loop = new UvLoopHandle(_log); _post = new UvAsyncHandle(_log); _thread = new Thread(ThreadStart); @@ -153,7 +155,7 @@ namespace Microsoft.AspNet.Server.Kestrel public Task PostAsync(Action callback, T state) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(); lock (_workSync) { _workAdding.Enqueue(new Work @@ -268,24 +270,14 @@ namespace Microsoft.AspNet.Server.Kestrel work.CallbackAdapter(work.Callback, work.State); if (work.Completion != null) { - ThreadPool.QueueUserWorkItem( - tcs => - { - ((TaskCompletionSource)tcs).SetResult(0); - }, - work.Completion); + _threadPool.Complete(work.Completion); } } catch (Exception ex) { if (work.Completion != null) { - ThreadPool.QueueUserWorkItem( - tcs => - { - ((TaskCompletionSource)tcs).SetException(ex); - }, - work.Completion); + _threadPool.Error(work.Completion, ex); } else { @@ -324,7 +316,7 @@ namespace Microsoft.AspNet.Server.Kestrel public Action CallbackAdapter; public object Callback; public object State; - public TaskCompletionSource Completion; + public TaskCompletionSource Completion; } private struct CloseHandle { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/LoggingThreadPool.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/LoggingThreadPool.cs new file mode 100644 index 0000000000..967787dc1f --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/LoggingThreadPool.cs @@ -0,0 +1,73 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Infrastructure +{ + public class LoggingThreadPool : IThreadPool + { + private readonly IKestrelTrace _log; + + private readonly WaitCallback _runAction; + private readonly WaitCallback _completeTcs; + + public LoggingThreadPool(IKestrelTrace log) + { + _log = log; + + // Curry and capture log in closures once + _runAction = (o) => + { + try + { + ((Action)o)(); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }; + + _completeTcs = (o) => + { + try + { + ((TaskCompletionSource)o).TrySetResult(null); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }; + } + + public void Run(Action action) + { + ThreadPool.QueueUserWorkItem(_runAction, action); + } + + public void Complete(TaskCompletionSource tcs) + { + ThreadPool.QueueUserWorkItem(_completeTcs, tcs); + } + + public void Error(TaskCompletionSource tcs, Exception ex) + { + // ex ang _log are closure captured + ThreadPool.QueueUserWorkItem((o) => + { + try + { + ((TaskCompletionSource)o).TrySetException(ex); + } + catch (Exception e) + { + _log.ApplicationError(e); + } + }, tcs); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelServer.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelServer.cs index f1e3504d22..b8e9c5dd07 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/KestrelServer.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelServer.cs @@ -7,6 +7,7 @@ using Microsoft.AspNet.Hosting; using Microsoft.AspNet.Hosting.Server; using Microsoft.AspNet.Http.Features; using Microsoft.AspNet.Server.Kestrel.Http; +using Microsoft.AspNet.Server.Kestrel.Infrastructure; using Microsoft.Extensions.Logging; namespace Microsoft.AspNet.Server.Kestrel @@ -54,6 +55,7 @@ namespace Microsoft.AspNet.Server.Kestrel { var information = (KestrelServerInformation)Features.Get(); var dateHeaderValueManager = new DateHeaderValueManager(); + var trace = new KestrelTrace(_logger); var engine = new KestrelEngine(new ServiceContext { FrameFactory = (context, remoteEP, localEP, prepareRequest) => @@ -61,7 +63,8 @@ namespace Microsoft.AspNet.Server.Kestrel return new Frame(application, context, remoteEP, localEP, prepareRequest); }, AppLifetime = _applicationLifetime, - Log = new KestrelTrace(_logger), + Log = trace, + ThreadPool = new LoggingThreadPool(trace), DateHeaderValueManager = dateHeaderValueManager, ConnectionFilter = information.ConnectionFilter, NoDelay = information.NoDelay diff --git a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs index 62d5d5d758..31eeea898d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs @@ -21,6 +21,7 @@ namespace Microsoft.AspNet.Server.Kestrel { AppLifetime = context.AppLifetime; Log = context.Log; + ThreadPool = context.ThreadPool; FrameFactory = context.FrameFactory; DateHeaderValueManager = context.DateHeaderValueManager; ConnectionFilter = context.ConnectionFilter; @@ -31,6 +32,8 @@ namespace Microsoft.AspNet.Server.Kestrel public IKestrelTrace Log { get; set; } + public IThreadPool ThreadPool { get; set; } + public Func, Frame> FrameFactory { get; set; } public DateHeaderValueManager DateHeaderValueManager { get; set; } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs index 8266a1c2e9..cc9bc26618 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using System.Text; +using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Infrastructure; using Xunit; @@ -48,7 +49,9 @@ namespace Microsoft.AspNet.Server.KestrelTests [InlineData("Connection:\r\n \r\nCookie \r\n", 1)] public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders) { - var socketInput = new SocketInput(new MemoryPool2()); + var trace = new KestrelTrace(new TestKestrelTrace()); + var ltp = new LoggingThreadPool(trace); + var socketInput = new SocketInput(new MemoryPool2(), ltp); var headerCollection = new FrameRequestHeaders(); var headerArray = Encoding.ASCII.GetBytes(rawHeaders); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 629c35407c..dca77cf881 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -41,7 +41,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -87,7 +88,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -143,7 +145,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); var bufferSize = maxBytesPreCompleted; @@ -223,7 +226,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -299,7 +303,8 @@ namespace Microsoft.AspNet.Server.KestrelTests var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); // block 1 var start = socketOutput.ProducingStart(); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index a440ea0edf..827d47ba84 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -4,6 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Infrastructure; @@ -13,10 +14,12 @@ namespace Microsoft.AspNet.Server.KestrelTests { public TestInput() { + var trace = new KestrelTrace(new TestKestrelTrace()); + var ltp = new LoggingThreadPool(trace); var memory2 = new MemoryPool2(); FrameContext = new FrameContext { - SocketInput = new SocketInput(memory2), + SocketInput = new SocketInput(memory2, ltp), ConnectionControl = this, FrameControl = this }; diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestServiceContext.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestServiceContext.cs index 9686915699..54dfc3c3b3 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestServiceContext.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestServiceContext.cs @@ -4,6 +4,7 @@ using Microsoft.AspNet.Http; using Microsoft.AspNet.Server.Kestrel; using Microsoft.AspNet.Server.Kestrel.Http; +using Microsoft.AspNet.Server.Kestrel.Infrastructure; namespace Microsoft.AspNet.Server.KestrelTests { @@ -15,6 +16,7 @@ namespace Microsoft.AspNet.Server.KestrelTests { AppLifetime = new LifetimeNotImplemented(); Log = new TestKestrelTrace(); + ThreadPool = new LoggingThreadPool(Log); DateHeaderValueManager = new TestDateHeaderValueManager(); }