From 0fbf919d505b8ff1708ad29d43c68d4d7fd5c1a5 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 29 Apr 2020 00:06:42 -0700 Subject: [PATCH] Use the RequestContext as the threadpool workitem (#21294) - Avoids a task allocation (reuses the existing RequestContext allocation) - Side benefit is that it's easier to see what things are queued into the threadpool for diagnostics! --- src/Servers/HttpSys/src/MessagePump.cs | 104 ++++-------------- .../src/RequestProcessing/RequestContext.cs | 85 +++++++++++++- 2 files changed, 103 insertions(+), 86 deletions(-) diff --git a/src/Servers/HttpSys/src/MessagePump.cs b/src/Servers/HttpSys/src/MessagePump.cs index f5e90a9876..cc90362e7a 100644 --- a/src/Servers/HttpSys/src/MessagePump.cs +++ b/src/Servers/HttpSys/src/MessagePump.cs @@ -22,11 +22,8 @@ namespace Microsoft.AspNetCore.Server.HttpSys private readonly ILogger _logger; private readonly HttpSysOptions _options; - private IHttpApplication _application; - private int _maxAccepts; private int _acceptorCounts; - private Action _processRequest; private volatile int _stopping; private int _outstandingRequests; @@ -58,15 +55,16 @@ namespace Microsoft.AspNetCore.Server.HttpSys _serverAddresses = new ServerAddressesFeature(); Features.Set(_serverAddresses); - _processRequest = new Action(ProcessRequestAsync); _maxAccepts = _options.MaxAccepts; } internal HttpSysListener Listener { get; } + internal IHttpApplication Application { get; set; } + public IFeatureCollection Features { get; } - private bool Stopping => _stopping == 1; + internal bool Stopping => _stopping == 1; public Task StartAsync(IHttpApplication application, CancellationToken cancellationToken) { @@ -115,11 +113,11 @@ namespace Microsoft.AspNetCore.Server.HttpSys // else // Attaching to an existing queue, don't add a default. // Can't call Start twice - Contract.Assert(_application == null); + Contract.Assert(Application == null); Contract.Assert(application != null); - _application = new ApplicationWrapper(application); + Application = new ApplicationWrapper(application); Listener.Start(); @@ -151,6 +149,21 @@ namespace Microsoft.AspNetCore.Server.HttpSys } } + internal int IncrementOutstandingRequest() + { + return Interlocked.Increment(ref _outstandingRequests); + } + + internal int DecrementOutstandingRequest() + { + return Interlocked.Decrement(ref _outstandingRequests); + } + + internal void SetShutdownSignal() + { + _shutdownSignal.TrySetResult(null); + } + // The message pump. // When we start listening for the next request on one thread, we may need to be sure that the // completion continues on another thread as to not block the current request processing. @@ -165,6 +178,8 @@ namespace Microsoft.AspNetCore.Server.HttpSys try { requestContext = await Listener.AcceptAsync().SupressContext(); + // Assign the message pump to this request context + requestContext.MessagePump = this; } catch (Exception exception) { @@ -181,7 +196,7 @@ namespace Microsoft.AspNetCore.Server.HttpSys } try { - Task ignored = Task.Factory.StartNew(_processRequest, requestContext); + ThreadPool.UnsafeQueueUserWorkItem(requestContext, preferLocal: false); } catch (Exception ex) { @@ -193,79 +208,6 @@ namespace Microsoft.AspNetCore.Server.HttpSys Interlocked.Decrement(ref _acceptorCounts); } - private async void ProcessRequestAsync(object requestContextObj) - { - var requestContext = requestContextObj as RequestContext; - try - { - if (Stopping) - { - SetFatalResponse(requestContext, 503); - return; - } - - object context = null; - Interlocked.Increment(ref _outstandingRequests); - try - { - var featureContext = new FeatureContext(requestContext); - context = _application.CreateContext(featureContext.Features); - try - { - await _application.ProcessRequestAsync(context).SupressContext(); - await featureContext.CompleteAsync(); - } - finally - { - await featureContext.OnCompleted(); - } - _application.DisposeContext(context, null); - requestContext.Dispose(); - } - catch (Exception ex) - { - _logger.LogError(LoggerEventIds.RequestProcessError, ex, "ProcessRequestAsync"); - _application.DisposeContext(context, ex); - if (requestContext.Response.HasStarted) - { - // HTTP/2 INTERNAL_ERROR = 0x2 https://tools.ietf.org/html/rfc7540#section-7 - // Otherwise the default is Cancel = 0x8. - requestContext.SetResetCode(2); - requestContext.Abort(); - } - else - { - // We haven't sent a response yet, try to send a 500 Internal Server Error - requestContext.Response.Headers.IsReadOnly = false; - requestContext.Response.Trailers.IsReadOnly = false; - requestContext.Response.Headers.Clear(); - requestContext.Response.Trailers.Clear(); - SetFatalResponse(requestContext, 500); - } - } - finally - { - if (Interlocked.Decrement(ref _outstandingRequests) == 0 && Stopping) - { - _logger.LogInformation(LoggerEventIds.RequestsDrained, "All requests drained."); - _shutdownSignal.TrySetResult(0); - } - } - } - catch (Exception ex) - { - _logger.LogError(LoggerEventIds.RequestError, ex, "ProcessRequestAsync"); - requestContext.Abort(); - } - } - - private static void SetFatalResponse(RequestContext context, int status) - { - context.Response.StatusCode = status; - context.Response.ContentLength = 0; - context.Dispose(); - } - public Task StopAsync(CancellationToken cancellationToken) { void RegisterCancelation() diff --git a/src/Servers/HttpSys/src/RequestProcessing/RequestContext.cs b/src/Servers/HttpSys/src/RequestProcessing/RequestContext.cs index 605288b397..de0cefb72d 100644 --- a/src/Servers/HttpSys/src/RequestProcessing/RequestContext.cs +++ b/src/Servers/HttpSys/src/RequestProcessing/RequestContext.cs @@ -4,9 +4,7 @@ using System; using System.Diagnostics; using System.IO; -using System.Runtime.CompilerServices; using System.Security.Authentication.ExtendedProtection; -using System.Security.Claims; using System.Security.Principal; using System.Threading; using System.Threading.Tasks; @@ -16,7 +14,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.HttpSys { - internal sealed class RequestContext : IDisposable + internal sealed class RequestContext : IDisposable, IThreadPoolWorkItem { private static readonly Action AbortDelegate = Abort; @@ -35,6 +33,8 @@ namespace Microsoft.AspNetCore.Server.HttpSys AllowSynchronousIO = server.Options.AllowSynchronousIO; } + internal MessagePump MessagePump { get; set; } + internal HttpSysListener Server { get; } internal ILogger Logger => Server.Logger; @@ -120,14 +120,14 @@ namespace Microsoft.AspNetCore.Server.HttpSys { if (!Request.IsHttps) { - Logger.LogDebug(LoggerEventIds.ChannelBindingNeedsHttps,"TryGetChannelBinding; Channel binding requires HTTPS."); + Logger.LogDebug(LoggerEventIds.ChannelBindingNeedsHttps, "TryGetChannelBinding; Channel binding requires HTTPS."); return false; } value = ClientCertLoader.GetChannelBindingFromTls(Server.RequestQueue, Request.UConnectionId, Logger); Debug.Assert(value != null, "GetChannelBindingFromTls returned null even though OS supposedly supports Extended Protection"); - Logger.LogDebug(LoggerEventIds.ChannelBindingRetrived,"Channel binding retrieved."); + Logger.LogDebug(LoggerEventIds.ChannelBindingRetrived, "Channel binding retrieved."); return value != null; } @@ -239,5 +239,80 @@ namespace Microsoft.AspNetCore.Server.HttpSys // RequestQueueHandle may have been closed } } + + public async void Execute() + { + var messagePump = MessagePump; + var application = messagePump.Application; + + try + { + if (messagePump.Stopping) + { + SetFatalResponse(503); + return; + } + + object context = null; + messagePump.IncrementOutstandingRequest(); + try + { + var featureContext = new FeatureContext(this); + context = application.CreateContext(featureContext.Features); + try + { + await application.ProcessRequestAsync(context).SupressContext(); + await featureContext.CompleteAsync(); + } + finally + { + await featureContext.OnCompleted(); + } + application.DisposeContext(context, null); + Dispose(); + } + catch (Exception ex) + { + Logger.LogError(LoggerEventIds.RequestProcessError, ex, "ProcessRequestAsync"); + application.DisposeContext(context, ex); + if (Response.HasStarted) + { + // HTTP/2 INTERNAL_ERROR = 0x2 https://tools.ietf.org/html/rfc7540#section-7 + // Otherwise the default is Cancel = 0x8. + SetResetCode(2); + Abort(); + } + else + { + // We haven't sent a response yet, try to send a 500 Internal Server Error + Response.Headers.IsReadOnly = false; + Response.Trailers.IsReadOnly = false; + Response.Headers.Clear(); + Response.Trailers.Clear(); + SetFatalResponse(500); + } + } + finally + { + if (messagePump.DecrementOutstandingRequest() == 0 && messagePump.Stopping) + { + Logger.LogInformation(LoggerEventIds.RequestsDrained, "All requests drained."); + messagePump.SetShutdownSignal(); + } + } + } + catch (Exception ex) + { + Logger.LogError(LoggerEventIds.RequestError, ex, "ProcessRequestAsync"); + Abort(); + } + } + + private void SetFatalResponse(int status) + { + Response.StatusCode = status; + Response.ContentLength = 0; + Dispose(); + } } }