diff --git a/src/Microsoft.AspNet.Server.WebListener/AwaitableThrottle.cs b/src/Microsoft.AspNet.Server.WebListener/AwaitableThrottle.cs new file mode 100644 index 0000000000..dc0baa555f --- /dev/null +++ b/src/Microsoft.AspNet.Server.WebListener/AwaitableThrottle.cs @@ -0,0 +1,100 @@ +//------------------------------------------------------------------------------ +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +//------------------------------------------------------------------------------ + +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.WebListener +{ + /// + /// Awaitable object that acts like a semaphore. The object would wait if more than maxConcurrent number of clients waits on it + /// + public class AwaitableThrottle + { + private static readonly TaskAwaiter CompletedAwaiter = Task.FromResult(true).GetAwaiter(); + + private int _maxConcurrent; + private readonly object _thislock; + private readonly Queue> _awaiters; + + private int _count; + + /// + /// Constructor + /// + /// maximum number of clients that can wait on this object at the same time + public AwaitableThrottle(int maxConcurrent) + { + _thislock = new object(); + _awaiters = new Queue>(); + _maxConcurrent = maxConcurrent; + } + + /// + /// Maximum amount of clients who can await on this throttle + /// + public int MaxConcurrent + { + get + { + return _maxConcurrent; + } + set + { + // Note: non-thread safe + _maxConcurrent = value; + } + } + + /// + /// Called by framework + /// + public TaskAwaiter GetAwaiter() + { + TaskCompletionSource awaiter; + + lock (_thislock) + { + if (_count < _maxConcurrent) + { + _count++; + return CompletedAwaiter; + } + + awaiter = new TaskCompletionSource(); + _awaiters.Enqueue(awaiter); + } + + return awaiter.Task.GetAwaiter(); + } + + /// + /// Release throttle + /// + public void Release() + { + TaskCompletionSource completion = null; + + lock (_thislock) + { + if (_awaiters.Count > 0) + { + completion = _awaiters.Dequeue(); + } + else + { + _count--; + } + } + + if (completion != null) + { + completion.SetResult(true); + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs b/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs index 4220f1c4fc..f9ed3fda72 100644 --- a/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs +++ b/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs @@ -78,9 +78,9 @@ namespace Microsoft.AspNet.Server.WebListener private List _uriPrefixes = new List(); private PumpLimits _pumpLimits; - private int _currentOutstandingAccepts; - private int _currentOutstandingRequests; - private Action _offloadListenForNextRequest; + private int _acceptorCounts; + private Action _processRequest; + private readonly AwaitableThrottle _requestProcessingThrottle; // The native request queue private long? _requestQueueLength; @@ -102,9 +102,10 @@ namespace Microsoft.AspNet.Server.WebListener _authManager = new AuthenticationManager(this); _connectionCancellationTokens = new ConcurrentDictionary(); - _offloadListenForNextRequest = new Action(ListenForNextRequestAsync); + _processRequest = new Action(ProcessRequestAsync); _pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests); + _requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests); } internal enum State @@ -189,16 +190,6 @@ namespace Microsoft.AspNet.Server.WebListener } } - private bool CanAcceptMoreRequests - { - get - { - PumpLimits limits = _pumpLimits; - return (_currentOutstandingAccepts < limits.MaxOutstandingAccepts - && _currentOutstandingRequests < limits.MaxOutstandingRequests - _currentOutstandingAccepts); - } - } - /// /// These are merged as one operation because they should be swapped out atomically. /// This controls how many requests the server attempts to process concurrently. @@ -209,8 +200,20 @@ namespace Microsoft.AspNet.Server.WebListener { _pumpLimits = new PumpLimits(maxAccepts, maxRequests); - // Kick the pump in case we went from zero to non-zero limits. - OffloadListenForNextRequestAsync(); + if (_state == State.Started) + { + ActivateRequestProcessingLimits(); + } + } + + private void ActivateRequestProcessingLimits() + { + _requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests; + + for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++) + { + ProcessRequestsWorker(); + } } /// @@ -454,7 +457,7 @@ namespace Microsoft.AspNet.Server.WebListener SetRequestQueueLimit(); - OffloadListenForNextRequestAsync(); + ActivateRequestProcessingLimits(); } catch (Exception exception) { @@ -468,50 +471,45 @@ namespace Microsoft.AspNet.Server.WebListener } } - // Make sure the next request is processed on another thread as to not recursively - // block the request we just received. - private void OffloadListenForNextRequestAsync() - { - if (IsListening && CanAcceptMoreRequests) - { - Task offloadTask = Task.Run(_offloadListenForNextRequest); - } - } - // The message pump. // When we start listening for the next request on one thread, we may need to be sure that the // completion continues on another thread as to not block the current request processing. // The awaits will manage stack depth for us. - private async void ListenForNextRequestAsync() + private async void ProcessRequestsWorker() { - while (IsListening && CanAcceptMoreRequests) + int workerIndex = Interlocked.Increment(ref _acceptorCounts); + while (IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts) { + await _requestProcessingThrottle; + // Receive a request RequestContext requestContext; - Interlocked.Increment(ref _currentOutstandingAccepts); try { requestContext = await GetContextAsync().SupressContext(); - Interlocked.Decrement(ref _currentOutstandingAccepts); } catch (Exception exception) { LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception); - // Assume the server has stopped. - Interlocked.Decrement(ref _currentOutstandingAccepts); Contract.Assert(!IsListening); return; } - - Interlocked.Increment(ref _currentOutstandingRequests); - OffloadListenForNextRequestAsync(); - await ProcessRequestAsync(requestContext).SupressContext(); - Interlocked.Decrement(ref _currentOutstandingRequests); + try + { + Task.Factory.StartNew(_processRequest, requestContext); + } + catch (Exception ex) + { + LogHelper.LogException(_logger, "ProcessRequestAsync", ex); + _requestProcessingThrottle.Release(); + } } + Interlocked.Decrement(ref _acceptorCounts); } - private async Task ProcessRequestAsync(RequestContext requestContext) + private async void ProcessRequestAsync(object requestContextObj) { + var requestContext = requestContextObj as RequestContext; try { try @@ -543,6 +541,10 @@ namespace Microsoft.AspNet.Server.WebListener requestContext.Abort(); requestContext.Dispose(); } + finally + { + _requestProcessingThrottle.Release(); + } } private void CleanupV2Config() diff --git a/src/Microsoft.AspNet.Server.WebListener/PumpLimits.cs b/src/Microsoft.AspNet.Server.WebListener/PumpLimits.cs index 28b99c11cc..f0adac342a 100644 --- a/src/Microsoft.AspNet.Server.WebListener/PumpLimits.cs +++ b/src/Microsoft.AspNet.Server.WebListener/PumpLimits.cs @@ -16,7 +16,7 @@ namespace Microsoft.AspNet.Server.WebListener { - internal class PumpLimits + internal struct PumpLimits { internal PumpLimits(int maxAccepts, int maxRequests) { @@ -24,8 +24,7 @@ namespace Microsoft.AspNet.Server.WebListener MaxOutstandingRequests = maxRequests; } - internal int MaxOutstandingAccepts { get; private set; } - - internal int MaxOutstandingRequests { get; private set; } + internal readonly int MaxOutstandingAccepts; + internal readonly int MaxOutstandingRequests; } }