Worker based request processing. A static number of workers are used to listen to requests

Signed-off-by: Shih-Ying Hsu <shhsu@microsoft.com>
This commit is contained in:
Shih-Ying Hsu 2014-03-04 09:26:35 -08:00
parent c6c5dd6fbf
commit c9b60c13e4
3 changed files with 144 additions and 43 deletions

View File

@ -0,0 +1,100 @@
//------------------------------------------------------------------------------
// <copyright file="HttpListener.cs" company="Microsoft">
// Copyright (c) Microsoft Corporation. All rights reserved.
// </copyright>
//------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.WebListener
{
/// <summary>
/// Awaitable object that acts like a semaphore. The object would wait if more than maxConcurrent number of clients waits on it
/// </summary>
public class AwaitableThrottle
{
private static readonly TaskAwaiter<bool> CompletedAwaiter = Task.FromResult(true).GetAwaiter();
private int _maxConcurrent;
private readonly object _thislock;
private readonly Queue<TaskCompletionSource<bool>> _awaiters;
private int _count;
/// <summary>
/// Constructor
/// </summary>
/// <param name="maxConcurrent">maximum number of clients that can wait on this object at the same time</param>
public AwaitableThrottle(int maxConcurrent)
{
_thislock = new object();
_awaiters = new Queue<TaskCompletionSource<bool>>();
_maxConcurrent = maxConcurrent;
}
/// <summary>
/// Maximum amount of clients who can await on this throttle
/// </summary>
public int MaxConcurrent
{
get
{
return _maxConcurrent;
}
set
{
// Note: non-thread safe
_maxConcurrent = value;
}
}
/// <summary>
/// Called by framework
/// </summary>
public TaskAwaiter<bool> GetAwaiter()
{
TaskCompletionSource<bool> awaiter;
lock (_thislock)
{
if (_count < _maxConcurrent)
{
_count++;
return CompletedAwaiter;
}
awaiter = new TaskCompletionSource<bool>();
_awaiters.Enqueue(awaiter);
}
return awaiter.Task.GetAwaiter();
}
/// <summary>
/// Release throttle
/// </summary>
public void Release()
{
TaskCompletionSource<bool> completion = null;
lock (_thislock)
{
if (_awaiters.Count > 0)
{
completion = _awaiters.Dequeue();
}
else
{
_count--;
}
}
if (completion != null)
{
completion.SetResult(true);
}
}
}
}

View File

@ -78,9 +78,9 @@ namespace Microsoft.AspNet.Server.WebListener
private List<Prefix> _uriPrefixes = new List<Prefix>();
private PumpLimits _pumpLimits;
private int _currentOutstandingAccepts;
private int _currentOutstandingRequests;
private Action _offloadListenForNextRequest;
private int _acceptorCounts;
private Action<object> _processRequest;
private readonly AwaitableThrottle _requestProcessingThrottle;
// The native request queue
private long? _requestQueueLength;
@ -102,9 +102,10 @@ namespace Microsoft.AspNet.Server.WebListener
_authManager = new AuthenticationManager(this);
_connectionCancellationTokens = new ConcurrentDictionary<ulong, ConnectionCancellation>();
_offloadListenForNextRequest = new Action(ListenForNextRequestAsync);
_processRequest = new Action<object>(ProcessRequestAsync);
_pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests);
_requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests);
}
internal enum State
@ -189,16 +190,6 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
private bool CanAcceptMoreRequests
{
get
{
PumpLimits limits = _pumpLimits;
return (_currentOutstandingAccepts < limits.MaxOutstandingAccepts
&& _currentOutstandingRequests < limits.MaxOutstandingRequests - _currentOutstandingAccepts);
}
}
/// <summary>
/// These are merged as one operation because they should be swapped out atomically.
/// This controls how many requests the server attempts to process concurrently.
@ -209,8 +200,20 @@ namespace Microsoft.AspNet.Server.WebListener
{
_pumpLimits = new PumpLimits(maxAccepts, maxRequests);
// Kick the pump in case we went from zero to non-zero limits.
OffloadListenForNextRequestAsync();
if (_state == State.Started)
{
ActivateRequestProcessingLimits();
}
}
private void ActivateRequestProcessingLimits()
{
_requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests;
for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++)
{
ProcessRequestsWorker();
}
}
/// <summary>
@ -454,7 +457,7 @@ namespace Microsoft.AspNet.Server.WebListener
SetRequestQueueLimit();
OffloadListenForNextRequestAsync();
ActivateRequestProcessingLimits();
}
catch (Exception exception)
{
@ -468,50 +471,45 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
// Make sure the next request is processed on another thread as to not recursively
// block the request we just received.
private void OffloadListenForNextRequestAsync()
{
if (IsListening && CanAcceptMoreRequests)
{
Task offloadTask = Task.Run(_offloadListenForNextRequest);
}
}
// The message pump.
// When we start listening for the next request on one thread, we may need to be sure that the
// completion continues on another thread as to not block the current request processing.
// The awaits will manage stack depth for us.
private async void ListenForNextRequestAsync()
private async void ProcessRequestsWorker()
{
while (IsListening && CanAcceptMoreRequests)
int workerIndex = Interlocked.Increment(ref _acceptorCounts);
while (IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts)
{
await _requestProcessingThrottle;
// Receive a request
RequestContext requestContext;
Interlocked.Increment(ref _currentOutstandingAccepts);
try
{
requestContext = await GetContextAsync().SupressContext();
Interlocked.Decrement(ref _currentOutstandingAccepts);
}
catch (Exception exception)
{
LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception);
// Assume the server has stopped.
Interlocked.Decrement(ref _currentOutstandingAccepts);
Contract.Assert(!IsListening);
return;
}
Interlocked.Increment(ref _currentOutstandingRequests);
OffloadListenForNextRequestAsync();
await ProcessRequestAsync(requestContext).SupressContext();
Interlocked.Decrement(ref _currentOutstandingRequests);
try
{
Task.Factory.StartNew(_processRequest, requestContext);
}
catch (Exception ex)
{
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
_requestProcessingThrottle.Release();
}
}
Interlocked.Decrement(ref _acceptorCounts);
}
private async Task ProcessRequestAsync(RequestContext requestContext)
private async void ProcessRequestAsync(object requestContextObj)
{
var requestContext = requestContextObj as RequestContext;
try
{
try
@ -543,6 +541,10 @@ namespace Microsoft.AspNet.Server.WebListener
requestContext.Abort();
requestContext.Dispose();
}
finally
{
_requestProcessingThrottle.Release();
}
}
private void CleanupV2Config()

View File

@ -16,7 +16,7 @@
namespace Microsoft.AspNet.Server.WebListener
{
internal class PumpLimits
internal struct PumpLimits
{
internal PumpLimits(int maxAccepts, int maxRequests)
{
@ -24,8 +24,7 @@ namespace Microsoft.AspNet.Server.WebListener
MaxOutstandingRequests = maxRequests;
}
internal int MaxOutstandingAccepts { get; private set; }
internal int MaxOutstandingRequests { get; private set; }
internal readonly int MaxOutstandingAccepts;
internal readonly int MaxOutstandingRequests;
}
}