Add IThreadPool interface

This commit is contained in:
Ben Adams 2015-12-03 09:38:30 +00:00
parent 850d2b0c7e
commit b1e8f0cdea
13 changed files with 57 additions and 41 deletions

View File

@ -20,9 +20,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
Stream filteredStream,
MemoryPool2 memory,
IKestrelTrace logger,
ThreadPoolActions threadPoolActions)
IThreadPool threadPool)
{
SocketInput = new SocketInput(memory, threadPoolActions);
SocketInput = new SocketInput(memory, threadPool);
SocketOutput = new StreamSocketOutput(filteredStream, memory);
_log = logger;

View File

@ -41,8 +41,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_connectionId = Interlocked.Increment(ref _lastConnectionId);
_rawSocketInput = new SocketInput(Memory2, ThreadPoolActions);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPoolActions);
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
}
public void Start()
@ -114,7 +114,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
// Frame.Abort calls user code while this method is always
// called from a libuv thread.
ThreadPool.QueueUserWorkItem(state =>
System.Threading.ThreadPool.QueueUserWorkItem(state =>
{
var connection = (Connection)state;
connection._frame.Abort();
@ -124,7 +124,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private void ApplyConnectionFilter()
{
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPoolActions);
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log, ThreadPool);
SocketInput = filteredStreamAdapter.SocketInput;
SocketOutput = filteredStreamAdapter.SocketOutput;

View File

@ -16,7 +16,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private static readonly Action _awaitableIsNotCompleted = () => { };
private readonly MemoryPool2 _memory;
private readonly ThreadPoolActions _threadPoolActions;
private readonly IThreadPool _threadPool;
private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false);
private Action _awaitableState;
@ -27,10 +27,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private MemoryPoolBlock2 _pinned;
private readonly object _sync = new Object();
public SocketInput(MemoryPool2 memory, ThreadPoolActions threadPoolActions)
public SocketInput(MemoryPool2 memory, IThreadPool threadPool)
{
_memory = memory;
_threadPoolActions = threadPoolActions;
_threadPool = threadPool;
_awaitableState = _awaitableIsNotCompleted;
}
@ -130,7 +130,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
_threadPoolActions.Run(awaitableState);
_threadPool.Run(awaitableState);
}
}
@ -190,7 +190,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
_threadPoolActions.Run(awaitableState);
_threadPool.Run(awaitableState);
}
}
@ -212,7 +212,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
else if (awaitableState == _awaitableIsCompleted)
{
_threadPoolActions.Run(continuation);
_threadPool.Run(continuation);
}
else
{

View File

@ -25,7 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private readonly Connection _connection;
private readonly long _connectionId;
private readonly IKestrelTrace _log;
private readonly ThreadPoolActions _threadPoolActions;
private readonly IThreadPool _threadPool;
// This locks all access to _tail, _isProducing and _returnFromOnProducingComplete.
// _head does not require a lock, since it is only used in the ctor and uv thread.
@ -57,14 +57,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Connection connection,
long connectionId,
IKestrelTrace log,
ThreadPoolActions threadPoolActions)
IThreadPool threadPool)
{
_thread = thread;
_socket = socket;
_connection = connection;
_connectionId = connectionId;
_log = log;
_threadPoolActions = threadPoolActions;
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
@ -322,11 +322,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var tcs = _tasksCompleted.Dequeue();
if (_lastWriteError == null)
{
_threadPoolActions.Complete(tcs);
_threadPool.Complete(tcs);
}
else
{
_threadPoolActions.Error(tcs, _lastWriteError);
_threadPool.Error(tcs, _lastWriteError);
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
{
public interface IThreadPool
{
void Complete(TaskCompletionSource<object> tcs);
void Error(TaskCompletionSource<object> tcs, Exception ex);
void Run(Action action);
}
}

View File

@ -33,14 +33,14 @@ namespace Microsoft.AspNet.Server.Kestrel
private bool _initCompleted = false;
private ExceptionDispatchInfo _closeError;
private IKestrelTrace _log;
private ThreadPoolActions _threadPoolActions;
private IThreadPool _threadPool;
public KestrelThread(KestrelEngine engine)
{
_engine = engine;
_appLifetime = engine.AppLifetime;
_log = engine.Log;
_threadPoolActions = engine.ThreadPoolActions;
_threadPool = engine.ThreadPool;
_loop = new UvLoopHandle(_log);
_post = new UvAsyncHandle(_log);
_thread = new Thread(ThreadStart);
@ -268,14 +268,14 @@ namespace Microsoft.AspNet.Server.Kestrel
work.CallbackAdapter(work.Callback, work.State);
if (work.Completion != null)
{
_threadPoolActions.Complete(work.Completion);
_threadPool.Complete(work.Completion);
}
}
catch (Exception ex)
{
if (work.Completion != null)
{
_threadPoolActions.Error(work.Completion, ex);
_threadPool.Error(work.Completion, ex);
}
else
{

View File

@ -4,18 +4,17 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Http;
namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
{
public class ThreadPoolActions
public class LoggingThreadPool : IThreadPool
{
private readonly IKestrelTrace _log;
private readonly WaitCallback _runAction;
private readonly WaitCallback _completeTcs;
public ThreadPoolActions(IKestrelTrace log)
public LoggingThreadPool(IKestrelTrace log)
{
_log = log;

View File

@ -64,7 +64,7 @@ namespace Microsoft.AspNet.Server.Kestrel
},
AppLifetime = _applicationLifetime,
Log = trace,
ThreadPoolActions = new ThreadPoolActions(trace),
ThreadPool = new LoggingThreadPool(trace),
DateHeaderValueManager = dateHeaderValueManager,
ConnectionFilter = information.ConnectionFilter,
NoDelay = information.NoDelay

View File

@ -21,7 +21,7 @@ namespace Microsoft.AspNet.Server.Kestrel
{
AppLifetime = context.AppLifetime;
Log = context.Log;
ThreadPoolActions = context.ThreadPoolActions;
ThreadPool = context.ThreadPool;
FrameFactory = context.FrameFactory;
DateHeaderValueManager = context.DateHeaderValueManager;
ConnectionFilter = context.ConnectionFilter;
@ -32,7 +32,7 @@ namespace Microsoft.AspNet.Server.Kestrel
public IKestrelTrace Log { get; set; }
public ThreadPoolActions ThreadPoolActions { get; set; }
public IThreadPool ThreadPool { get; set; }
public Func<ConnectionContext, IPEndPoint, IPEndPoint, Action<IFeatureCollection>, Frame> FrameFactory { get; set; }

View File

@ -50,8 +50,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders)
{
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketInput = new SocketInput(new MemoryPool2(), tpa);
var ltp = new LoggingThreadPool(trace);
var socketInput = new SocketInput(new MemoryPool2(), ltp);
var headerCollection = new FrameRequestHeaders();
var headerArray = Encoding.ASCII.GetBytes(rawHeaders);

View File

@ -41,8 +41,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
var bufferSize = 1048576;
@ -88,8 +88,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -145,8 +145,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var bufferSize = maxBytesPreCompleted;
@ -226,8 +226,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -303,8 +303,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, tpa);
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
// block 1
var start = socketOutput.ProducingStart();

View File

@ -15,11 +15,11 @@ namespace Microsoft.AspNet.Server.KestrelTests
public TestInput()
{
var trace = new KestrelTrace(new TestKestrelTrace());
var tpa = new ThreadPoolActions(trace);
var ltp = new LoggingThreadPool(trace);
var memory2 = new MemoryPool2();
FrameContext = new FrameContext
{
SocketInput = new SocketInput(memory2, tpa),
SocketInput = new SocketInput(memory2, ltp),
ConnectionControl = this,
FrameControl = this
};

View File

@ -4,6 +4,7 @@
using Microsoft.AspNet.Http;
using Microsoft.AspNet.Server.Kestrel;
using Microsoft.AspNet.Server.Kestrel.Http;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
namespace Microsoft.AspNet.Server.KestrelTests
{
@ -15,6 +16,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
AppLifetime = new LifetimeNotImplemented();
Log = new TestKestrelTrace();
ThreadPool = new LoggingThreadPool(Log);
DateHeaderValueManager = new TestDateHeaderValueManager();
}