Dylan/queue limit (#10590)
* Added queue limit, extra requests are 503'd * Combined counters into an atomic `TotalRequests` * small safety fix * Updated log messages; simplified flow
This commit is contained in:
parent
993d943aec
commit
bd4b843678
|
|
@ -20,5 +20,6 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
{
|
||||
public RequestThrottlingOptions() { }
|
||||
public int? MaxConcurrentRequests { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
|
||||
public int RequestQueueLimit { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ namespace RequestThrottlingSample
|
|||
services.Configure<RequestThrottlingOptions>(options =>
|
||||
{
|
||||
options.MaxConcurrentRequests = 2;
|
||||
options.RequestQueueLimit = 0;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -31,7 +32,7 @@ namespace RequestThrottlingSample
|
|||
|
||||
app.Run(async context =>
|
||||
{
|
||||
await context.Response.WriteAsync("Hello Request Throttling! <p></p>");
|
||||
await context.Response.WriteAsync("Hello Request Throttling! If you refresh this page a bunch, it will 503.");
|
||||
await Task.Delay(1000);
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,56 +9,54 @@ namespace Microsoft.AspNetCore.RequestThrottling.Internal
|
|||
{
|
||||
internal class RequestQueue : IDisposable
|
||||
{
|
||||
private SemaphoreSlim _semaphore;
|
||||
private object _waitingRequestsLock = new object();
|
||||
public readonly int MaxConcurrentRequests;
|
||||
public int WaitingRequests { get; private set; }
|
||||
private readonly int _maxConcurrentRequests;
|
||||
private readonly int _requestQueueLimit;
|
||||
private readonly SemaphoreSlim _serverSemaphore;
|
||||
|
||||
public RequestQueue(int maxConcurrentRequests)
|
||||
private object _totalRequestsLock = new object();
|
||||
public int TotalRequests { get; private set; }
|
||||
|
||||
public RequestQueue(int maxConcurrentRequests, int requestQueueLimit)
|
||||
{
|
||||
MaxConcurrentRequests = maxConcurrentRequests;
|
||||
_semaphore = new SemaphoreSlim(maxConcurrentRequests);
|
||||
_maxConcurrentRequests = maxConcurrentRequests;
|
||||
_requestQueueLimit = requestQueueLimit;
|
||||
_serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
|
||||
}
|
||||
|
||||
public async Task EnterQueue()
|
||||
public async Task<bool> TryEnterQueueAsync()
|
||||
{
|
||||
var waitInQueueTask = _semaphore.WaitAsync();
|
||||
// a return value of 'false' indicates that the request is rejected
|
||||
// a return value of 'true' indicates that the request may proceed
|
||||
// _serverSemaphore.Release is *not* called in this method, it is called externally when requests leave the server
|
||||
|
||||
var needsToWaitOnQueue = !waitInQueueTask.IsCompletedSuccessfully;
|
||||
if (needsToWaitOnQueue)
|
||||
lock (_totalRequestsLock)
|
||||
{
|
||||
lock (_waitingRequestsLock)
|
||||
if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
|
||||
{
|
||||
WaitingRequests++;
|
||||
return false;
|
||||
}
|
||||
|
||||
await waitInQueueTask;
|
||||
|
||||
lock (_waitingRequestsLock)
|
||||
{
|
||||
WaitingRequests--;
|
||||
}
|
||||
TotalRequests++;
|
||||
}
|
||||
|
||||
await _serverSemaphore.WaitAsync();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public void Release()
|
||||
{
|
||||
_semaphore.Release();
|
||||
}
|
||||
_serverSemaphore.Release();
|
||||
|
||||
public int Count
|
||||
{
|
||||
get => _semaphore.CurrentCount;
|
||||
}
|
||||
|
||||
public int ConcurrentRequests
|
||||
{
|
||||
get => MaxConcurrentRequests - _semaphore.CurrentCount;
|
||||
lock (_totalRequestsLock)
|
||||
{
|
||||
TotalRequests--;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_semaphore.Dispose();
|
||||
_serverSemaphore.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.RequestThrottling.Internal;
|
||||
|
|
@ -16,7 +17,6 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
public class RequestThrottlingMiddleware
|
||||
{
|
||||
private readonly RequestQueue _requestQueue;
|
||||
private readonly RequestThrottlingOptions _options;
|
||||
private readonly RequestDelegate _next;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
|
|
@ -32,11 +32,16 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
{
|
||||
throw new ArgumentException("The value of 'options.MaxConcurrentRequests' must be specified.", nameof(options));
|
||||
}
|
||||
if (options.Value.RequestQueueLimit < 0)
|
||||
{
|
||||
throw new ArgumentException("The value of 'options.RequestQueueLimit' must be a positive integer.", nameof(options));
|
||||
}
|
||||
|
||||
_next = next;
|
||||
_logger = loggerFactory.CreateLogger<RequestThrottlingMiddleware>();
|
||||
_options = options.Value;
|
||||
_requestQueue = new RequestQueue(_options.MaxConcurrentRequests.Value);
|
||||
_requestQueue = new RequestQueue(
|
||||
options.Value.MaxConcurrentRequests.Value,
|
||||
options.Value.RequestQueueLimit);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -46,17 +51,24 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
/// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
|
||||
public async Task Invoke(HttpContext context)
|
||||
{
|
||||
var waitInQueueTask = _requestQueue.EnterQueue();
|
||||
|
||||
if (waitInQueueTask.IsCompletedSuccessfully)
|
||||
var waitInQueueTask = _requestQueue.TryEnterQueueAsync();
|
||||
if (waitInQueueTask.IsCompletedSuccessfully && !waitInQueueTask.Result)
|
||||
{
|
||||
RequestThrottlingLog.RequestRunImmediately(_logger);
|
||||
RequestThrottlingLog.RequestRejectedQueueFull(_logger);
|
||||
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
|
||||
return;
|
||||
}
|
||||
else if (!waitInQueueTask.IsCompletedSuccessfully)
|
||||
{
|
||||
RequestThrottlingLog.RequestEnqueued(_logger, ActiveRequestCount);
|
||||
var result = await waitInQueueTask;
|
||||
RequestThrottlingLog.RequestDequeued(_logger, ActiveRequestCount);
|
||||
|
||||
Debug.Assert(result);
|
||||
}
|
||||
else
|
||||
{
|
||||
RequestThrottlingLog.RequestEnqueued(_logger, WaitingRequests);
|
||||
await waitInQueueTask;
|
||||
RequestThrottlingLog.RequestDequeued(_logger, WaitingRequests);
|
||||
RequestThrottlingLog.RequestRunImmediately(_logger, ActiveRequestCount);
|
||||
}
|
||||
|
||||
try
|
||||
|
|
@ -70,46 +82,48 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
}
|
||||
|
||||
/// <summary>
|
||||
/// The number of live requests that are downstream from this middleware.
|
||||
/// Cannot exceeed <see cref="RequestThrottlingOptions.MaxConcurrentRequests"/>.
|
||||
/// The number of requests currently on the server.
|
||||
/// Cannot exceeed the sum of <see cref="RequestThrottlingOptions.RequestQueueLimit"> and </see>/><see cref="RequestThrottlingOptions.MaxConcurrentRequests"/>.
|
||||
/// </summary>
|
||||
internal int ConcurrentRequests
|
||||
internal int ActiveRequestCount
|
||||
{
|
||||
get => _requestQueue.ConcurrentRequests;
|
||||
get => _requestQueue.TotalRequests;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Number of requests currently enqueued and waiting to be processed.
|
||||
/// </summary>
|
||||
internal int WaitingRequests
|
||||
{
|
||||
get => _requestQueue.WaitingRequests;
|
||||
}
|
||||
// TODO :: update log wording to reflect the changes
|
||||
|
||||
private static class RequestThrottlingLog
|
||||
{
|
||||
private static readonly Action<ILogger, int, Exception> _requestEnqueued =
|
||||
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(1, "RequestEnqueued"), "Concurrent request limit reached, queuing request. Current queue length: {QueuedRequests}.");
|
||||
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(1, "RequestEnqueued"), "MaxConcurrentRequests limit reached, request has been queued. Current active requests: {ActiveRequests}.");
|
||||
|
||||
private static readonly Action<ILogger, int, Exception> _requestDequeued =
|
||||
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(2, "RequestDequeued"), "Request dequeued. Current queue length: {QueuedRequests}.");
|
||||
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(2, "RequestDequeued"), "Request dequeued. Current active requests: {ActiveRequests}.");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _requestRunImmediately =
|
||||
LoggerMessage.Define(LogLevel.Debug, new EventId(3, "RequestRunImmediately"), "Concurrent request limit has not been reached, running request immediately.");
|
||||
private static readonly Action<ILogger, int, Exception> _requestRunImmediately =
|
||||
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(3, "RequestRunImmediately"), "Below MaxConcurrentRequests limit, running request immediately. Current active requests: {ActiveRequests}");
|
||||
|
||||
internal static void RequestEnqueued(ILogger logger, int queuedRequests)
|
||||
private static readonly Action<ILogger, Exception> _requestRejectedQueueFull =
|
||||
LoggerMessage.Define(LogLevel.Debug, new EventId(4, "RequestRejectedQueueFull"), "Currently at the 'RequestQueueLimit', rejecting this request with a '503 server not availible' error");
|
||||
|
||||
internal static void RequestEnqueued(ILogger logger, int activeRequests)
|
||||
{
|
||||
_requestEnqueued(logger, queuedRequests, null);
|
||||
_requestEnqueued(logger, activeRequests, null);
|
||||
}
|
||||
|
||||
internal static void RequestDequeued(ILogger logger, int queuedRequests)
|
||||
internal static void RequestDequeued(ILogger logger, int activeRequests)
|
||||
{
|
||||
_requestDequeued(logger, queuedRequests, null);
|
||||
_requestDequeued(logger, activeRequests, null);
|
||||
}
|
||||
|
||||
internal static void RequestRunImmediately(ILogger logger)
|
||||
internal static void RequestRunImmediately(ILogger logger, int activeRequests)
|
||||
{
|
||||
_requestRunImmediately(logger, null);
|
||||
_requestRunImmediately(logger, activeRequests, null);
|
||||
}
|
||||
|
||||
internal static void RequestRejectedQueueFull(ILogger logger)
|
||||
{
|
||||
_requestRejectedQueueFull(logger, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,5 +15,12 @@ namespace Microsoft.AspNetCore.RequestThrottling
|
|||
/// This is null by default because the correct value is application specific. This option must be configured by the application.
|
||||
/// </summary>
|
||||
public int? MaxConcurrentRequests { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of queued requests before the server starts rejecting connections with '503 Service Unavailible'.
|
||||
/// Setting this value to 0 will disable the queue; all requests will either immediately enter the server or be rejected.
|
||||
/// Defaults to 5000 queued requests.
|
||||
/// </summary>
|
||||
public int RequestQueueLimit { get; set; } = 5000;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
[Fact]
|
||||
public async Task RequestsCanEnterIfSpaceAvailible()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleWare(maxConcurrentRequests: 1);
|
||||
var middleware = TestUtils.CreateTestMiddleware(maxConcurrentRequests: 1);
|
||||
var context = new DefaultHttpContext();
|
||||
|
||||
// a request should go through with no problems
|
||||
|
|
@ -24,18 +24,18 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
[Fact]
|
||||
public async Task SemaphoreStatePreservedIfRequestsError()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleWare(
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
maxConcurrentRequests: 1,
|
||||
next: httpContext =>
|
||||
{
|
||||
throw new DivideByZeroException();
|
||||
});
|
||||
|
||||
Assert.Equal(0, middleware.ConcurrentRequests);
|
||||
Assert.Equal(0, middleware.ActiveRequestCount);
|
||||
|
||||
await Assert.ThrowsAsync<DivideByZeroException>(() => middleware.Invoke(new DefaultHttpContext()));
|
||||
|
||||
Assert.Equal(0, middleware.ConcurrentRequests);
|
||||
Assert.Equal(0, middleware.ActiveRequestCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
@ -44,7 +44,7 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
var blocker = new SyncPoint();
|
||||
var firstRequest = true;
|
||||
|
||||
var middleware = TestUtils.CreateTestMiddleWare(
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
maxConcurrentRequests: 1,
|
||||
next: httpContext =>
|
||||
{
|
||||
|
|
@ -58,14 +58,12 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
|
||||
// t1 (as the first request) is blocked by the tcs blocker
|
||||
var t1 = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(1, middleware.ConcurrentRequests);
|
||||
Assert.Equal(0, middleware.WaitingRequests);
|
||||
Assert.Equal(1, middleware.ActiveRequestCount);
|
||||
|
||||
// t2 is blocked from entering the server since t1 already exists there
|
||||
// note: increasing MaxConcurrentRequests would allow t2 through while t1 is blocked
|
||||
var t2 = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(1, middleware.ConcurrentRequests);
|
||||
Assert.Equal(1, middleware.WaitingRequests);
|
||||
Assert.Equal(2, middleware.ActiveRequestCount);
|
||||
|
||||
// unblock the first task, and the second should follow
|
||||
blocker.Continue();
|
||||
|
|
@ -78,9 +76,56 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
{
|
||||
var ex = Assert.Throws<ArgumentException>(() =>
|
||||
{
|
||||
TestUtils.CreateTestMiddleWare(maxConcurrentRequests: null);
|
||||
TestUtils.CreateTestMiddleware(maxConcurrentRequests: null);
|
||||
});
|
||||
Assert.Equal("options", ex.ParamName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async void RequestsBlockedIfQueueFull()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
maxConcurrentRequests: 0,
|
||||
requestQueueLimit: 0,
|
||||
next: httpContext =>
|
||||
{
|
||||
// throttle should bounce the request; it should never get here
|
||||
throw new NotImplementedException();
|
||||
});
|
||||
|
||||
await middleware.Invoke(new DefaultHttpContext());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async void FullQueueResultsIn503Error()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
maxConcurrentRequests: 0,
|
||||
requestQueueLimit: 0);
|
||||
|
||||
var context = new DefaultHttpContext();
|
||||
await middleware.Invoke(context);
|
||||
Assert.Equal(503, context.Response.StatusCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MultipleRequestsFillUpQueue()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
maxConcurrentRequests: 0,
|
||||
requestQueueLimit: 10,
|
||||
next: httpContext =>
|
||||
{
|
||||
return Task.Delay(TimeSpan.FromSeconds(30));
|
||||
});
|
||||
|
||||
Assert.Equal(0, middleware.ActiveRequestCount);
|
||||
|
||||
var _ = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(1, middleware.ActiveRequestCount);
|
||||
|
||||
_ = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(2, middleware.ActiveRequestCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,69 +12,52 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
[Fact]
|
||||
public async Task LimitsIncomingRequests()
|
||||
{
|
||||
using var s = new RequestQueue(1);
|
||||
Assert.Equal(1, s.Count);
|
||||
using var s = TestUtils.CreateRequestQueue(1);
|
||||
Assert.Equal(0, s.TotalRequests);
|
||||
|
||||
await s.EnterQueue().OrTimeout();
|
||||
Assert.Equal(0, s.Count);
|
||||
Assert.True(await s.TryEnterQueueAsync().OrTimeout());
|
||||
Assert.Equal(1, s.TotalRequests);
|
||||
|
||||
s.Release();
|
||||
Assert.Equal(1, s.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TracksQueueLength()
|
||||
{
|
||||
using var s = new RequestQueue(1);
|
||||
Assert.Equal(0, s.WaitingRequests);
|
||||
|
||||
await s.EnterQueue();
|
||||
Assert.Equal(0, s.WaitingRequests);
|
||||
|
||||
var enterQueueTask = s.EnterQueue();
|
||||
Assert.Equal(1, s.WaitingRequests);
|
||||
|
||||
s.Release();
|
||||
await enterQueueTask;
|
||||
Assert.Equal(0, s.WaitingRequests);
|
||||
Assert.Equal(0, s.TotalRequests);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DoesNotWaitIfSpaceAvailible()
|
||||
{
|
||||
using var s = new RequestQueue(2);
|
||||
using var s = TestUtils.CreateRequestQueue(2);
|
||||
|
||||
var t1 = s.EnterQueue();
|
||||
var t1 = s.TryEnterQueueAsync();
|
||||
Assert.True(t1.IsCompleted);
|
||||
|
||||
var t2 = s.EnterQueue();
|
||||
var t2 = s.TryEnterQueueAsync();
|
||||
Assert.True(t2.IsCompleted);
|
||||
|
||||
var t3 = s.EnterQueue();
|
||||
var t3 = s.TryEnterQueueAsync();
|
||||
Assert.False(t3.IsCompleted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitsIfNoSpaceAvailible()
|
||||
{
|
||||
using var s = new RequestQueue(1);
|
||||
await s.EnterQueue().OrTimeout();
|
||||
using var s = TestUtils.CreateRequestQueue(1);
|
||||
Assert.True(await s.TryEnterQueueAsync().OrTimeout());
|
||||
|
||||
var waitingTask = s.EnterQueue();
|
||||
var waitingTask = s.TryEnterQueueAsync();
|
||||
Assert.False(waitingTask.IsCompleted);
|
||||
|
||||
s.Release();
|
||||
await waitingTask.OrTimeout();
|
||||
Assert.True(await waitingTask.OrTimeout());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsEncapsulated()
|
||||
{
|
||||
using var s1 = new RequestQueue(1);
|
||||
using var s2 = new RequestQueue(1);
|
||||
using var s1 = TestUtils.CreateRequestQueue(1);
|
||||
using var s2 = TestUtils.CreateRequestQueue(1);
|
||||
|
||||
await s1.EnterQueue().OrTimeout();
|
||||
await s2.EnterQueue().OrTimeout();
|
||||
Assert.True(await s1.TryEnterQueueAsync().OrTimeout());
|
||||
Assert.True(await s2.TryEnterQueueAsync().OrTimeout());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,16 +6,18 @@ using Microsoft.AspNetCore.RequestThrottling;
|
|||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Microsoft.AspNetCore.RequestThrottling.Internal;
|
||||
|
||||
namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
||||
{
|
||||
public static class TestUtils
|
||||
{
|
||||
public static RequestThrottlingMiddleware CreateTestMiddleWare(int? maxConcurrentRequests, RequestDelegate next = null)
|
||||
public static RequestThrottlingMiddleware CreateTestMiddleware(int? maxConcurrentRequests, int requestQueueLimit = 5000, RequestDelegate next = null)
|
||||
{
|
||||
var options = new RequestThrottlingOptions
|
||||
{
|
||||
MaxConcurrentRequests = maxConcurrentRequests
|
||||
MaxConcurrentRequests = maxConcurrentRequests,
|
||||
RequestQueueLimit = requestQueueLimit
|
||||
};
|
||||
|
||||
return new RequestThrottlingMiddleware(
|
||||
|
|
@ -24,5 +26,7 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
|
|||
options: Options.Create(options)
|
||||
);
|
||||
}
|
||||
|
||||
internal static RequestQueue CreateRequestQueue(int maxConcurrentRequests) => new RequestQueue(maxConcurrentRequests, 5000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue