Remove KestrelThreadPool abstraction and replaced it with PipeScheduler (#2371)
This commit is contained in:
parent
71bff00c0d
commit
e7d3b0c5f9
|
|
@ -86,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
internal static PipeOptions GetInputPipeOptions(ServiceContext serviceContext, MemoryPool<byte> memoryPool, PipeScheduler writerScheduler) => new PipeOptions
|
||||
(
|
||||
pool: memoryPool,
|
||||
readerScheduler: serviceContext.ThreadPool,
|
||||
readerScheduler: serviceContext.Scheduler,
|
||||
writerScheduler: writerScheduler,
|
||||
pauseWriterThreshold: serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
|
||||
resumeWriterThreshold: serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
|
||||
|
|
@ -97,7 +97,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
(
|
||||
pool: memoryPool,
|
||||
readerScheduler: readerScheduler,
|
||||
writerScheduler: serviceContext.ThreadPool,
|
||||
writerScheduler: serviceContext.Scheduler,
|
||||
pauseWriterThreshold: GetOutputResponseBufferSize(serviceContext),
|
||||
resumeWriterThreshold: GetOutputResponseBufferSize(serviceContext),
|
||||
useSynchronizationContext: false
|
||||
|
|
|
|||
|
|
@ -422,7 +422,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
Output.Abort(error);
|
||||
|
||||
// Potentially calling user code. CancelRequestAbortedToken logs any exceptions.
|
||||
ServiceContext.ThreadPool.UnsafeRun(state => ((HttpProtocol)state).CancelRequestAbortedToken(), this);
|
||||
ServiceContext.Scheduler.Schedule(state => ((HttpProtocol)state).CancelRequestAbortedToken(), this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1333,7 +1333,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
=> new Pipe(new PipeOptions
|
||||
(
|
||||
pool: _context.MemoryPool,
|
||||
readerScheduler: ServiceContext.ThreadPool,
|
||||
readerScheduler: ServiceContext.Scheduler,
|
||||
writerScheduler: PipeScheduler.Inline,
|
||||
pauseWriterThreshold: 1,
|
||||
resumeWriterThreshold: 1,
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
internal PipeOptions AdaptedInputPipeOptions => new PipeOptions
|
||||
(
|
||||
pool: MemoryPool,
|
||||
readerScheduler: _context.ServiceContext.ThreadPool,
|
||||
readerScheduler: _context.ServiceContext.Scheduler,
|
||||
writerScheduler: PipeScheduler.Inline,
|
||||
pauseWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
|
||||
resumeWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
|
||||
|
|
|
|||
|
|
@ -1,48 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
||||
{
|
||||
public class InlineLoggingThreadPool : KestrelThreadPool
|
||||
{
|
||||
private readonly IKestrelTrace _log;
|
||||
|
||||
public InlineLoggingThreadPool(IKestrelTrace log)
|
||||
{
|
||||
_log = log;
|
||||
}
|
||||
|
||||
public override void Run(Action action)
|
||||
{
|
||||
try
|
||||
{
|
||||
action();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.LogError(0, e, "InlineLoggingThreadPool.Run");
|
||||
}
|
||||
}
|
||||
|
||||
public override void UnsafeRun(WaitCallback action, object state)
|
||||
{
|
||||
action(state);
|
||||
}
|
||||
|
||||
public override void Schedule<T>(Action<T> action, T state)
|
||||
{
|
||||
try
|
||||
{
|
||||
action(state);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.LogError(0, e, "InlineLoggingThreadPool.Schedule");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
||||
{
|
||||
public abstract class KestrelThreadPool : PipeScheduler
|
||||
{
|
||||
public abstract void Run(Action action);
|
||||
public abstract void UnsafeRun(WaitCallback action, object state);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,58 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
||||
{
|
||||
public class LoggingThreadPool : KestrelThreadPool
|
||||
{
|
||||
private readonly IKestrelTrace _log;
|
||||
|
||||
private WaitCallback _runAction;
|
||||
|
||||
public LoggingThreadPool(IKestrelTrace log)
|
||||
{
|
||||
_log = log;
|
||||
|
||||
// Curry and capture log in closures once
|
||||
// The currying is done in functions of the same name to improve the
|
||||
// call stack for exceptions and profiling else it shows up as LoggingThreadPool.ctor>b__4_0
|
||||
// and you aren't sure which of the 3 functions was called.
|
||||
RunAction();
|
||||
}
|
||||
|
||||
private void RunAction()
|
||||
{
|
||||
// Capture _log in a singleton closure
|
||||
_runAction = (o) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
((Action)o)();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_log.LogError(0, e, "LoggingThreadPool.Run");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public override void Run(Action action)
|
||||
{
|
||||
System.Threading.ThreadPool.QueueUserWorkItem(_runAction, action);
|
||||
}
|
||||
|
||||
public override void UnsafeRun(WaitCallback action, object state)
|
||||
{
|
||||
System.Threading.ThreadPool.QueueUserWorkItem(action, state);
|
||||
}
|
||||
|
||||
public override void Schedule<T>(Action<T> action, T state)
|
||||
{
|
||||
Run(() => action(state));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
|
||||
|
|
@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
{
|
||||
public IKestrelTrace Log { get; set; }
|
||||
|
||||
public KestrelThreadPool ThreadPool { get; set; }
|
||||
public PipeScheduler Scheduler { get; set; }
|
||||
|
||||
public IHttpParser<Http1ParsingHandler> HttpParser { get; set; }
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Hosting.Server;
|
||||
|
|
@ -79,15 +80,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
|
|||
|
||||
// TODO: This logic will eventually move into the IConnectionHandler<T> and off
|
||||
// the service context once we get to https://github.com/aspnet/KestrelHttpServer/issues/1662
|
||||
KestrelThreadPool threadPool = null;
|
||||
PipeScheduler scheduler = null;
|
||||
switch (serverOptions.ApplicationSchedulingMode)
|
||||
{
|
||||
case SchedulingMode.Default:
|
||||
case SchedulingMode.ThreadPool:
|
||||
threadPool = new LoggingThreadPool(trace);
|
||||
scheduler = PipeScheduler.ThreadPool;
|
||||
break;
|
||||
case SchedulingMode.Inline:
|
||||
threadPool = new InlineLoggingThreadPool(trace);
|
||||
scheduler = PipeScheduler.Inline;
|
||||
break;
|
||||
default:
|
||||
throw new NotSupportedException(CoreStrings.FormatUnknownTransportMode(serverOptions.ApplicationSchedulingMode));
|
||||
|
|
@ -97,7 +98,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
|
|||
{
|
||||
Log = trace,
|
||||
HttpParser = new HttpParser<Http1ParsingHandler>(trace.IsEnabled(LogLevel.Information)),
|
||||
ThreadPool = threadPool,
|
||||
Scheduler = scheduler,
|
||||
SystemClock = systemClock,
|
||||
DateHeaderValueManager = dateHeaderValueManager,
|
||||
ConnectionManager = connectionManager,
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
serviceContext.ServerOptions.Limits.MaxResponseBufferSize = maxResponseBufferSize;
|
||||
serviceContext.ThreadPool = new LoggingThreadPool(null);
|
||||
serviceContext.Scheduler = PipeScheduler.ThreadPool;
|
||||
|
||||
var mockScheduler = Mock.Of<PipeScheduler>();
|
||||
var outputPipeOptions = ConnectionHandler.GetOutputPipeOptions(serviceContext, KestrelMemoryPool.Create(), readerScheduler: mockScheduler);
|
||||
|
|
@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(expectedMaximumSizeLow, outputPipeOptions.ResumeWriterThreshold);
|
||||
Assert.Equal(expectedMaximumSizeHigh, outputPipeOptions.PauseWriterThreshold);
|
||||
Assert.Same(mockScheduler, outputPipeOptions.ReaderScheduler);
|
||||
Assert.Same(serviceContext.ThreadPool, outputPipeOptions.WriterScheduler);
|
||||
Assert.Same(serviceContext.Scheduler, outputPipeOptions.WriterScheduler);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
|
|
@ -41,14 +41,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
serviceContext.ServerOptions.Limits.MaxRequestBufferSize = maxRequestBufferSize;
|
||||
serviceContext.ThreadPool = new LoggingThreadPool(null);
|
||||
serviceContext.Scheduler = PipeScheduler.ThreadPool;
|
||||
|
||||
var mockScheduler = Mock.Of<PipeScheduler>();
|
||||
var inputPipeOptions = ConnectionHandler.GetInputPipeOptions(serviceContext, KestrelMemoryPool.Create(), writerScheduler: mockScheduler);
|
||||
|
||||
Assert.Equal(expectedMaximumSizeLow, inputPipeOptions.ResumeWriterThreshold);
|
||||
Assert.Equal(expectedMaximumSizeHigh, inputPipeOptions.PauseWriterThreshold);
|
||||
Assert.Same(serviceContext.ThreadPool, inputPipeOptions.ReaderScheduler);
|
||||
Assert.Same(serviceContext.Scheduler, inputPipeOptions.ReaderScheduler);
|
||||
Assert.Same(mockScheduler, inputPipeOptions.WriterScheduler);
|
||||
}
|
||||
|
||||
|
|
@ -67,7 +67,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
Assert.Equal(expectedMaximumSizeLow, connectionLifetime.AdaptedInputPipeOptions.ResumeWriterThreshold);
|
||||
Assert.Equal(expectedMaximumSizeHigh, connectionLifetime.AdaptedInputPipeOptions.PauseWriterThreshold);
|
||||
Assert.Same(serviceContext.ThreadPool, connectionLifetime.AdaptedInputPipeOptions.ReaderScheduler);
|
||||
Assert.Same(serviceContext.Scheduler, connectionLifetime.AdaptedInputPipeOptions.ReaderScheduler);
|
||||
Assert.Same(PipeScheduler.Inline, connectionLifetime.AdaptedInputPipeOptions.WriterScheduler);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ using System.Collections.Concurrent;
|
|||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
|
|
@ -997,7 +998,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
{
|
||||
var testContext = new TestServiceContext();
|
||||
// FIN callbacks are scheduled so run inline to make this test more reliable
|
||||
testContext.ThreadPool = new InlineLoggingThreadPool(testContext.Log);
|
||||
testContext.Scheduler = PipeScheduler.Inline;
|
||||
|
||||
using (var server = new TestServer(TestApp.EchoAppChunked, testContext, listenOptions))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -722,7 +722,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
var serviceContext = new TestServiceContext
|
||||
{
|
||||
Log = new TestKestrelTrace(logger),
|
||||
ThreadPool = new InlineLoggingThreadPool(new TestKestrelTrace(logger))
|
||||
Scheduler = PipeScheduler.Inline
|
||||
};
|
||||
var transportContext = new TestLibuvTransportContext { Log = new LibuvTrace(logger) };
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,19 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
using Microsoft.AspNetCore.Testing.xunit;
|
||||
|
|
@ -128,6 +131,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
}
|
||||
|
||||
await transport.UnbindAsync();
|
||||
|
||||
if (!await serviceContext.ConnectionManager.CloseAllConnectionsAsync(default).ConfigureAwait(false))
|
||||
{
|
||||
await serviceContext.ConnectionManager.AbortAllConnectionsAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await transport.StopAsync();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
{
|
||||
DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager,
|
||||
ServerOptions = serviceContextPrimary.ServerOptions,
|
||||
ThreadPool = serviceContextPrimary.ThreadPool,
|
||||
Scheduler = serviceContextPrimary.Scheduler,
|
||||
HttpParser = serviceContextPrimary.HttpParser,
|
||||
};
|
||||
var builderSecondary = new ConnectionBuilder();
|
||||
|
|
@ -220,7 +220,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
{
|
||||
DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager,
|
||||
ServerOptions = serviceContextPrimary.ServerOptions,
|
||||
ThreadPool = serviceContextPrimary.ThreadPool,
|
||||
Scheduler = serviceContextPrimary.Scheduler,
|
||||
HttpParser = serviceContextPrimary.HttpParser,
|
||||
};
|
||||
var builderSecondary = new ConnectionBuilder();
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
|
|
@ -34,7 +35,7 @@ namespace Microsoft.AspNetCore.Testing
|
|||
{
|
||||
LoggerFactory = loggerFactory;
|
||||
Log = kestrelTrace;
|
||||
ThreadPool = new LoggingThreadPool(Log);
|
||||
Scheduler = PipeScheduler.ThreadPool;
|
||||
SystemClock = new MockSystemClock();
|
||||
DateHeaderValueManager = new DateHeaderValueManager(SystemClock);
|
||||
ConnectionManager = new HttpConnectionManager(Log, ResourceCounter.Unlimited);
|
||||
|
|
|
|||
Loading…
Reference in New Issue