Dylan/resettable tcs (#12453)
* renames, resettable tcs; custom awaitable now dispatches
This commit is contained in:
parent
080660967b
commit
1aebfa681a
|
|
@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
|||
{
|
||||
private const int _numRequests = 20000;
|
||||
|
||||
private ConcurrencyLimiterMiddleware _middlewareFIFO;
|
||||
private ConcurrencyLimiterMiddleware _middlewareLIFO;
|
||||
private ConcurrencyLimiterMiddleware _middlewareQueue;
|
||||
private ConcurrencyLimiterMiddleware _middlewareStack;
|
||||
private RequestDelegate _restOfServer;
|
||||
|
||||
[GlobalSetup]
|
||||
|
|
@ -22,12 +22,12 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
|||
{
|
||||
_restOfServer = YieldsThreadInternally ? (RequestDelegate)YieldsThread : (RequestDelegate)CompletesImmediately;
|
||||
|
||||
_middlewareFIFO = TestUtils.CreateTestMiddleware_TailDrop(
|
||||
_middlewareQueue = TestUtils.CreateTestMiddleware_QueuePolicy(
|
||||
maxConcurrentRequests: 1,
|
||||
requestQueueLimit: 100,
|
||||
next: _restOfServer);
|
||||
|
||||
_middlewareLIFO = TestUtils.CreateTestMiddleware_StackPolicy(
|
||||
_middlewareStack = TestUtils.CreateTestMiddleware_StackPolicy(
|
||||
maxConcurrentRequests: 1,
|
||||
requestQueueLimit: 100,
|
||||
next: _restOfServer);
|
||||
|
|
@ -46,20 +46,20 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
|||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRequests)]
|
||||
public async Task WithEmptyQueueOverhead_FIFO()
|
||||
public async Task WithEmptyQueueOverhead_QueuePolicy()
|
||||
{
|
||||
for (int i = 0; i < _numRequests; i++)
|
||||
{
|
||||
await _middlewareFIFO.Invoke(null);
|
||||
await _middlewareQueue.Invoke(null);
|
||||
}
|
||||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRequests)]
|
||||
public async Task WithEmptyQueueOverhead_LIFO()
|
||||
public async Task WithEmptyQueueOverhead_StackPolicy()
|
||||
{
|
||||
for (int i = 0; i < _numRequests; i++)
|
||||
{
|
||||
await _middlewareLIFO.Invoke(null);
|
||||
await _middlewareStack.Invoke(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,19 +4,19 @@
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using BenchmarkDotNet.Attributes;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.ConcurrencyLimiter.Tests;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
||||
{
|
||||
public class QueueFullOverhead
|
||||
{
|
||||
private const int _numRequests = 200;
|
||||
private const int _numRequests = 2000;
|
||||
private int _requestCount = 0;
|
||||
private ManualResetEventSlim _mres = new ManualResetEventSlim();
|
||||
|
||||
private ConcurrencyLimiterMiddleware _middleware_FIFO;
|
||||
private ConcurrencyLimiterMiddleware _middleware_LIFO;
|
||||
private ConcurrencyLimiterMiddleware _middlewareQueue;
|
||||
private ConcurrencyLimiterMiddleware _middlewareStack;
|
||||
|
||||
[Params(8)]
|
||||
public int MaxConcurrentRequests;
|
||||
|
|
@ -24,12 +24,12 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
|||
[GlobalSetup]
|
||||
public void GlobalSetup()
|
||||
{
|
||||
_middleware_FIFO = TestUtils.CreateTestMiddleware_TailDrop(
|
||||
_middlewareQueue = TestUtils.CreateTestMiddleware_QueuePolicy(
|
||||
maxConcurrentRequests: MaxConcurrentRequests,
|
||||
requestQueueLimit: _numRequests,
|
||||
next: IncrementAndCheck);
|
||||
|
||||
_middleware_LIFO = TestUtils.CreateTestMiddleware_StackPolicy(
|
||||
_middlewareStack = TestUtils.CreateTestMiddleware_StackPolicy(
|
||||
maxConcurrentRequests: MaxConcurrentRequests,
|
||||
requestQueueLimit: _numRequests,
|
||||
next: IncrementAndCheck);
|
||||
|
|
@ -64,26 +64,25 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
|||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRequests)]
|
||||
public void QueueingAll_FIFO()
|
||||
public void QueueingAll_QueuePolicy()
|
||||
{
|
||||
for (int i = 0; i < _numRequests; i++)
|
||||
{
|
||||
_ = _middleware_FIFO.Invoke(null);
|
||||
_ = _middlewareStack.Invoke(null);
|
||||
}
|
||||
|
||||
_mres.Wait();
|
||||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRequests)]
|
||||
public void QueueingAll_LIFO()
|
||||
public void QueueingAll_StackPolicy()
|
||||
{
|
||||
for (int i = 0; i < _numRequests; i++)
|
||||
{
|
||||
_ = _middleware_LIFO.Invoke(null);
|
||||
_ = _middlewareQueue.Invoke(null);
|
||||
}
|
||||
|
||||
_mres.Wait();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,96 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using BenchmarkDotNet.Attributes;
|
||||
using Microsoft.AspNetCore.ConcurrencyLimiter.Tests;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter.Microbenchmarks
|
||||
{
|
||||
public class QueueRequestsOverwritten
|
||||
{
|
||||
private const int _numRejects = 5000;
|
||||
private int _queueLength = 20;
|
||||
private int _rejectionCount = 0;
|
||||
private ManualResetEventSlim _mres = new ManualResetEventSlim();
|
||||
|
||||
private ConcurrencyLimiterMiddleware _middlewareQueue;
|
||||
private ConcurrencyLimiterMiddleware _middlewareStack;
|
||||
|
||||
[GlobalSetup]
|
||||
public void GlobalSetup()
|
||||
{
|
||||
_middlewareQueue = TestUtils.CreateTestMiddleware_QueuePolicy(
|
||||
maxConcurrentRequests: 1,
|
||||
requestQueueLimit: 20,
|
||||
next: WaitForever,
|
||||
onRejected: IncrementRejections);
|
||||
|
||||
_middlewareStack = TestUtils.CreateTestMiddleware_StackPolicy(
|
||||
maxConcurrentRequests: 1,
|
||||
requestQueueLimit: 20,
|
||||
next: WaitForever,
|
||||
onRejected: IncrementRejections);
|
||||
}
|
||||
|
||||
[IterationSetup]
|
||||
public void Setup()
|
||||
{
|
||||
_rejectionCount = 0;
|
||||
_mres.Reset();
|
||||
}
|
||||
|
||||
private async Task IncrementRejections(HttpContext context)
|
||||
{
|
||||
if (Interlocked.Increment(ref _rejectionCount) == _numRejects)
|
||||
{
|
||||
_mres.Set();
|
||||
}
|
||||
|
||||
await Task.Yield();
|
||||
}
|
||||
|
||||
private async Task WaitForever(HttpContext context)
|
||||
{
|
||||
await Task.Delay(int.MaxValue);
|
||||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRejects)]
|
||||
public void Baseline()
|
||||
{
|
||||
var toSend = _queueLength + _numRejects + 1;
|
||||
for (int i = 0; i < toSend; i++)
|
||||
{
|
||||
_ = IncrementRejections(new DefaultHttpContext());
|
||||
}
|
||||
|
||||
_mres.Wait();
|
||||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRejects)]
|
||||
public void RejectingRapidly_QueuePolicy()
|
||||
{
|
||||
var toSend = _queueLength + _numRejects + 1;
|
||||
for (int i = 0; i < toSend; i++)
|
||||
{
|
||||
_ = _middlewareQueue.Invoke(new DefaultHttpContext());
|
||||
}
|
||||
|
||||
_mres.Wait();
|
||||
}
|
||||
|
||||
[Benchmark(OperationsPerInvoke = _numRejects)]
|
||||
public void RejectingRapidly_StackPolicy()
|
||||
{
|
||||
var toSend = _queueLength + _numRejects + 1;
|
||||
for (int i = 0; i < toSend; i++)
|
||||
{
|
||||
_ = _middlewareStack.Invoke(new DefaultHttpContext());
|
||||
}
|
||||
|
||||
_mres.Wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -24,7 +24,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
public partial interface IQueuePolicy
|
||||
{
|
||||
void OnExit();
|
||||
System.Threading.Tasks.Task<bool> TryEnterAsync();
|
||||
System.Threading.Tasks.ValueTask<bool> TryEnterAsync();
|
||||
}
|
||||
public partial class QueuePolicyOptions
|
||||
{
|
||||
|
|
@ -37,7 +37,7 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
{
|
||||
public static partial class QueuePolicyServiceCollectionExtensions
|
||||
{
|
||||
public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddFIFOQueue(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, System.Action<Microsoft.AspNetCore.ConcurrencyLimiter.QueuePolicyOptions> configure) { throw null; }
|
||||
public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddLIFOQueue(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, System.Action<Microsoft.AspNetCore.ConcurrencyLimiter.QueuePolicyOptions> configure) { throw null; }
|
||||
public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddQueuePolicy(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, System.Action<Microsoft.AspNetCore.ConcurrencyLimiter.QueuePolicyOptions> configure) { throw null; }
|
||||
public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddStackPolicy(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, System.Action<Microsoft.AspNetCore.ConcurrencyLimiter.QueuePolicyOptions> configure) { throw null; }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
|
|
@ -17,9 +15,10 @@ namespace ConcurrencyLimiterSample
|
|||
{
|
||||
public void ConfigureServices(IServiceCollection services)
|
||||
{
|
||||
services.AddLIFOQueue((options) => {
|
||||
options.MaxConcurrentRequests = Environment.ProcessorCount;
|
||||
options.RequestQueueLimit = 50;
|
||||
services.AddStackPolicy(options =>
|
||||
{
|
||||
options.MaxConcurrentRequests = 2;
|
||||
options.RequestQueueLimit = 25;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -28,8 +27,7 @@ namespace ConcurrencyLimiterSample
|
|||
app.UseConcurrencyLimiter();
|
||||
app.Run(async context =>
|
||||
{
|
||||
var delay = 100;
|
||||
Task.Delay(delay).Wait();
|
||||
Task.Delay(100).Wait(); // 100ms sync-over-async
|
||||
|
||||
await context.Response.WriteAsync("Hello World!");
|
||||
});
|
||||
|
|
@ -39,7 +37,6 @@ namespace ConcurrencyLimiterSample
|
|||
{
|
||||
new WebHostBuilder()
|
||||
.UseKestrel()
|
||||
.UseContentRoot(Directory.GetCurrentDirectory()) // for cert file
|
||||
.UseStartup<Startup>()
|
||||
.Build()
|
||||
.Run();
|
||||
|
|
|
|||
|
|
@ -2,9 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.Tracing;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.Internal;
|
||||
|
||||
|
|
|
|||
|
|
@ -48,19 +48,23 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
{
|
||||
var waitInQueueTask = _queuePolicy.TryEnterAsync();
|
||||
|
||||
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
|
||||
bool result;
|
||||
|
||||
if (waitInQueueTask.IsCompleted)
|
||||
{
|
||||
ConcurrencyLimiterEventSource.Log.QueueSkipped();
|
||||
result = waitInQueueTask.Result;
|
||||
}
|
||||
else
|
||||
{
|
||||
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
|
||||
{
|
||||
await waitInQueueTask;
|
||||
result = await waitInQueueTask;
|
||||
}
|
||||
}
|
||||
|
||||
if (waitInQueueTask.Result)
|
||||
if (result)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
/// When it returns 'true' the request procedes to the server.
|
||||
/// When it returns 'false' the request is rejected immediately.
|
||||
/// </summary>
|
||||
Task<bool> TryEnterAsync();
|
||||
ValueTask<bool> TryEnterAsync();
|
||||
|
||||
/// <summary>
|
||||
/// Called after successful requests have been returned from the server.
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ using Microsoft.Extensions.Options;
|
|||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
||||
{
|
||||
internal class FIFOQueuePolicy : IQueuePolicy, IDisposable
|
||||
internal class QueuePolicy : IQueuePolicy, IDisposable
|
||||
{
|
||||
private readonly int _maxConcurrentRequests;
|
||||
private readonly int _requestQueueLimit;
|
||||
|
|
@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
private object _totalRequestsLock = new object();
|
||||
public int TotalRequests { get; private set; }
|
||||
|
||||
public FIFOQueuePolicy(IOptions<QueuePolicyOptions> options)
|
||||
public QueuePolicy(IOptions<QueuePolicyOptions> options)
|
||||
{
|
||||
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
|
||||
if (_maxConcurrentRequests <= 0)
|
||||
|
|
@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
_serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
|
||||
}
|
||||
|
||||
public async Task<bool> TryEnterAsync()
|
||||
public async ValueTask<bool> TryEnterAsync()
|
||||
{
|
||||
// a return value of 'false' indicates that the request is rejected
|
||||
// a return value of 'true' indicates that the request may proceed
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
||||
{
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
/// <param name="configure">Set the options used by the queue.
|
||||
/// Mandatory, since <see cref="QueuePolicyOptions.MaxConcurrentRequests"></see> must be provided.</param>
|
||||
/// <returns></returns>
|
||||
public static IServiceCollection AddFIFOQueue(this IServiceCollection services, Action<QueuePolicyOptions> configure)
|
||||
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
|
||||
{
|
||||
services.Configure(configure);
|
||||
services.AddSingleton<IQueuePolicy, FIFOQueuePolicy>();
|
||||
services.AddSingleton<IQueuePolicy, QueuePolicy>();
|
||||
return services;
|
||||
}
|
||||
|
||||
|
|
@ -32,10 +32,10 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
/// <param name="configure">Set the options used by the queue.
|
||||
/// Mandatory, since <see cref="QueuePolicyOptions.MaxConcurrentRequests"></see> must be provided.</param>
|
||||
/// <returns></returns>
|
||||
public static IServiceCollection AddLIFOQueue(this IServiceCollection services, Action<QueuePolicyOptions> configure)
|
||||
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
|
||||
{
|
||||
services.Configure(configure);
|
||||
services.AddSingleton<IQueuePolicy, LIFOQueuePolicy>();
|
||||
services.AddSingleton<IQueuePolicy, StackPolicy>();
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading.Tasks.Sources;
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
||||
{
|
||||
/// <summary>
|
||||
/// Custom awaiter to allow the StackPolicy to reduce allocations.
|
||||
/// When this completion source has its result checked, it resets itself and returns itself to the cache of its parent StackPolicy.
|
||||
/// Then when the StackPolicy needs a new completion source, it tries to get one from its cache, otherwise it allocates.
|
||||
/// </summary>
|
||||
internal class ResettableBooleanCompletionSource : IValueTaskSource<bool>
|
||||
{
|
||||
ManualResetValueTaskSourceCore<bool> _valueTaskSource;
|
||||
private readonly StackPolicy _queue;
|
||||
|
||||
public ResettableBooleanCompletionSource(StackPolicy queue)
|
||||
{
|
||||
_queue = queue;
|
||||
_valueTaskSource.RunContinuationsAsynchronously = true;
|
||||
}
|
||||
|
||||
public ValueTask<bool> GetValueTask()
|
||||
{
|
||||
return new ValueTask<bool>(this, _valueTaskSource.Version);
|
||||
}
|
||||
|
||||
bool IValueTaskSource<bool>.GetResult(short token)
|
||||
{
|
||||
var isValid = token == _valueTaskSource.Version;
|
||||
try
|
||||
{
|
||||
return _valueTaskSource.GetResult(token);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (isValid)
|
||||
{
|
||||
_valueTaskSource.Reset();
|
||||
_queue._cachedResettableTCS = this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTaskSourceStatus GetStatus(short token)
|
||||
{
|
||||
return _valueTaskSource.GetStatus(token);
|
||||
}
|
||||
|
||||
void IValueTaskSource<bool>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
|
||||
{
|
||||
_valueTaskSource.OnCompleted(continuation, state, token, flags);
|
||||
}
|
||||
|
||||
public void Complete(bool result)
|
||||
{
|
||||
_valueTaskSource.SetResult(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
|
|
@ -5,30 +8,32 @@ using Microsoft.Extensions.Options;
|
|||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
||||
{
|
||||
internal class LIFOQueuePolicy : IQueuePolicy
|
||||
internal class StackPolicy : IQueuePolicy
|
||||
{
|
||||
private readonly List<TaskCompletionSource<bool>> _buffer;
|
||||
private readonly List<ResettableBooleanCompletionSource> _buffer;
|
||||
public ResettableBooleanCompletionSource _cachedResettableTCS;
|
||||
|
||||
private readonly int _maxQueueCapacity;
|
||||
private readonly int _maxConcurrentRequests;
|
||||
private bool _hasReachedCapacity;
|
||||
private int _head;
|
||||
private int _queueLength;
|
||||
|
||||
private static readonly Task<bool> _trueTask = Task.FromResult(true);
|
||||
|
||||
private readonly object _bufferLock = new Object();
|
||||
|
||||
private readonly static ValueTask<bool> _trueTask = new ValueTask<bool>(true);
|
||||
|
||||
private int _freeServerSpots;
|
||||
|
||||
public LIFOQueuePolicy(IOptions<QueuePolicyOptions> options)
|
||||
public StackPolicy(IOptions<QueuePolicyOptions> options)
|
||||
{
|
||||
_buffer = new List<TaskCompletionSource<bool>>();
|
||||
_buffer = new List<ResettableBooleanCompletionSource>();
|
||||
_maxQueueCapacity = options.Value.RequestQueueLimit;
|
||||
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
|
||||
_freeServerSpots = options.Value.MaxConcurrentRequests;
|
||||
}
|
||||
|
||||
public Task<bool> TryEnterAsync()
|
||||
public ValueTask<bool> TryEnterAsync()
|
||||
{
|
||||
lock (_bufferLock)
|
||||
{
|
||||
|
|
@ -42,12 +47,13 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
if (_queueLength == _maxQueueCapacity)
|
||||
{
|
||||
_hasReachedCapacity = true;
|
||||
_buffer[_head].SetResult(false);
|
||||
_buffer[_head].Complete(false);
|
||||
_queueLength--;
|
||||
}
|
||||
|
||||
// enqueue request with a tcs
|
||||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
|
||||
_cachedResettableTCS = null;
|
||||
|
||||
if (_hasReachedCapacity || _queueLength < _buffer.Count)
|
||||
{
|
||||
_buffer[_head] = tcs;
|
||||
|
|
@ -64,7 +70,8 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
{
|
||||
_head = 0;
|
||||
}
|
||||
return tcs.Task;
|
||||
|
||||
return tcs.GetValueTask();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -95,8 +102,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter
|
|||
_head--;
|
||||
}
|
||||
|
||||
_buffer[_head].SetResult(true);
|
||||
_buffer[_head] = null;
|
||||
_buffer[_head].Complete(true);
|
||||
_queueLength--;
|
||||
}
|
||||
}
|
||||
|
|
@ -63,7 +63,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
eventListener.EnableEvents(eventSource, EventLevel.Informational, EventKeywords.None,
|
||||
new Dictionary<string, string>
|
||||
{
|
||||
{"EventCounterIntervalSec", "1" }
|
||||
{"EventCounterIntervalSec", ".1" }
|
||||
});
|
||||
|
||||
// Act
|
||||
|
|
@ -104,7 +104,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
eventListener.EnableEvents(eventSource, EventLevel.Informational, EventKeywords.None,
|
||||
new Dictionary<string, string>
|
||||
{
|
||||
{"EventCounterIntervalSec", "1" }
|
||||
{"EventCounterIntervalSec", ".1" }
|
||||
});
|
||||
|
||||
// Act
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
queue: TestQueue.AlwaysTrue,
|
||||
next: httpContext => {
|
||||
next: httpContext =>
|
||||
{
|
||||
flag = true;
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
|
@ -27,7 +28,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public async void RequestRejectsIfQueueReturnsFalse()
|
||||
public async Task RequestRejectsIfQueueReturnsFalse()
|
||||
{
|
||||
bool onRejectedInvoked = false;
|
||||
|
||||
|
|
@ -46,7 +47,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public async void RequestsDoesNotEnterIfQueueFull()
|
||||
public async Task RequestsDoesNotEnterIfQueueFull()
|
||||
{
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
queue: TestQueue.AlwaysFalse,
|
||||
|
|
@ -67,11 +68,13 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
|
||||
Assert.Equal(0, testQueue.QueuedRequests);
|
||||
|
||||
_ = middleware.Invoke(new DefaultHttpContext());
|
||||
var task1 = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(1, testQueue.QueuedRequests);
|
||||
Assert.False(task1.IsCompleted);
|
||||
|
||||
_ = middleware.Invoke(new DefaultHttpContext());
|
||||
var task2 = middleware.Invoke(new DefaultHttpContext());
|
||||
Assert.Equal(2, testQueue.QueuedRequests);
|
||||
Assert.False(task2.IsCompleted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
@ -121,7 +124,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public async void ExceptionThrownDuringOnRejected()
|
||||
public async Task ExceptionThrownDuringOnRejected()
|
||||
{
|
||||
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
|
||||
|
||||
|
|
@ -175,5 +178,41 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
Assert.Equal(0, concurrent);
|
||||
Assert.Equal(0, testQueue.QueuedRequests);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MiddlewareOnlyCallsGetResultOnce()
|
||||
{
|
||||
var flag = false;
|
||||
|
||||
var queue = new TestQueueForResettableBoolean();
|
||||
var middleware = TestUtils.CreateTestMiddleware(
|
||||
queue,
|
||||
next: async context =>
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
flag = true;
|
||||
});
|
||||
|
||||
queue.Source.Complete(true);
|
||||
await middleware.Invoke(new DefaultHttpContext());
|
||||
|
||||
Assert.True(flag);
|
||||
}
|
||||
|
||||
private class TestQueueForResettableBoolean : IQueuePolicy
|
||||
{
|
||||
public ResettableBooleanCompletionSource Source;
|
||||
public TestQueueForResettableBoolean()
|
||||
{
|
||||
Source = new ResettableBooleanCompletionSource(TestUtils.CreateStackPolicy(1));
|
||||
}
|
||||
|
||||
public ValueTask<bool> TryEnterAsync()
|
||||
{
|
||||
return Source.GetValueTask();
|
||||
}
|
||||
|
||||
public void OnExit() { }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,18 +2,16 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Internal;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
||||
{
|
||||
public class TailDropTests
|
||||
public class QueuePolicyTests
|
||||
{
|
||||
[Fact]
|
||||
public void DoesNotWaitIfSpaceAvailible()
|
||||
{
|
||||
using var s = TestUtils.CreateTailDropQueue(2);
|
||||
using var s = TestUtils.CreateQueuePolicy(2);
|
||||
|
||||
var t1 = s.TryEnterAsync();
|
||||
Assert.True(t1.IsCompleted);
|
||||
|
|
@ -28,7 +26,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
[Fact]
|
||||
public async Task WaitsIfNoSpaceAvailible()
|
||||
{
|
||||
using var s = TestUtils.CreateTailDropQueue(1);
|
||||
using var s = TestUtils.CreateQueuePolicy(1);
|
||||
Assert.True(await s.TryEnterAsync().OrTimeout());
|
||||
|
||||
var waitingTask = s.TryEnterAsync();
|
||||
|
|
@ -41,8 +39,8 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
[Fact]
|
||||
public async Task IsEncapsulated()
|
||||
{
|
||||
using var s1 = TestUtils.CreateTailDropQueue(1);
|
||||
using var s2 = TestUtils.CreateTailDropQueue(1);
|
||||
using var s1 = TestUtils.CreateQueuePolicy(1);
|
||||
using var s2 = TestUtils.CreateQueuePolicy(1);
|
||||
|
||||
Assert.True(await s1.TryEnterAsync().OrTimeout());
|
||||
Assert.True(await s2.TryEnterAsync().OrTimeout());
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
||||
{
|
||||
public static class ResettableBooleanCompletionSourceTests
|
||||
{
|
||||
private static StackPolicy _testQueue = TestUtils.CreateStackPolicy(8);
|
||||
|
||||
[Fact]
|
||||
public async static void CanBeAwaitedMultipleTimes()
|
||||
{
|
||||
var tcs = new ResettableBooleanCompletionSource(_testQueue);
|
||||
|
||||
tcs.Complete(true);
|
||||
Assert.True(await tcs.GetValueTask());
|
||||
|
||||
tcs.Complete(true);
|
||||
Assert.True(await tcs.GetValueTask());
|
||||
|
||||
tcs.Complete(false);
|
||||
Assert.False(await tcs.GetValueTask());
|
||||
|
||||
tcs.Complete(false);
|
||||
Assert.False(await tcs.GetValueTask());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async static void CanSetResultToTrue()
|
||||
{
|
||||
var tcs = new ResettableBooleanCompletionSource(_testQueue);
|
||||
|
||||
_ = Task.Run(() =>
|
||||
{
|
||||
tcs.Complete(true);
|
||||
});
|
||||
|
||||
var result = await tcs.GetValueTask();
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async static void CanSetResultToFalse()
|
||||
{
|
||||
var tcs = new ResettableBooleanCompletionSource(_testQueue);
|
||||
|
||||
_ = Task.Run(() =>
|
||||
{
|
||||
tcs.Complete(false);
|
||||
});
|
||||
|
||||
var result = await tcs.GetValueTask();
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public static void DoubleCallToGetResultCausesError()
|
||||
{
|
||||
// important to verify it throws rather than acting like a new task
|
||||
|
||||
var tcs = new ResettableBooleanCompletionSource(_testQueue);
|
||||
var task = tcs.GetValueTask();
|
||||
tcs.Complete(true);
|
||||
|
||||
Assert.True(task.Result);
|
||||
Assert.Throws<InvalidOperationException>(() => task.Result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public static Task RunsContinuationsAsynchronously()
|
||||
{
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
|
||||
async void RunTest()
|
||||
{
|
||||
try
|
||||
{
|
||||
await RunsContinuationsAsynchronouslyInternally();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
tcs.SetException(ex);
|
||||
throw;
|
||||
}
|
||||
|
||||
tcs.SetResult(null);
|
||||
}
|
||||
|
||||
// The Xunit TestSyncContext causes the resettable tcs to always dispatch in effect.
|
||||
ThreadPool.UnsafeQueueUserWorkItem(_ => RunTest(), state: null);
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
private static async Task RunsContinuationsAsynchronouslyInternally()
|
||||
{
|
||||
var tcs = new ResettableBooleanCompletionSource(_testQueue);
|
||||
var mre = new ManualResetEventSlim();
|
||||
|
||||
async Task AwaitAndBlock()
|
||||
{
|
||||
await tcs.GetValueTask();
|
||||
mre.Wait();
|
||||
}
|
||||
|
||||
var task = AwaitAndBlock();
|
||||
|
||||
await Task.Run(() => tcs.Complete(true)).OrTimeout();
|
||||
|
||||
Assert.False(task.IsCompleted);
|
||||
|
||||
mre.Set();
|
||||
await task.OrTimeout();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
|
@ -5,12 +8,12 @@ using Xunit;
|
|||
|
||||
namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
||||
{
|
||||
public static class StackQueueTests
|
||||
public static class StackPolicyTests
|
||||
{
|
||||
[Fact]
|
||||
public static void BaseFunctionality()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions {
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions {
|
||||
MaxConcurrentRequests = 0,
|
||||
RequestQueueLimit = 2,
|
||||
}));
|
||||
|
|
@ -23,11 +26,11 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
|
||||
Assert.True(task1.IsCompleted && task1.Result);
|
||||
}
|
||||
|
||||
|
||||
[Fact]
|
||||
public static void OldestRequestOverwritten()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions {
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions {
|
||||
MaxConcurrentRequests = 0,
|
||||
RequestQueueLimit = 3,
|
||||
}));
|
||||
|
|
@ -40,16 +43,19 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
Assert.False(task3.IsCompleted);
|
||||
|
||||
var task4 = stack.TryEnterAsync();
|
||||
Assert.False(task4.IsCompleted);
|
||||
|
||||
|
||||
Assert.True(task1.IsCompleted);
|
||||
Assert.False(task1.Result);
|
||||
|
||||
Assert.False(task2.IsCompleted);
|
||||
Assert.False(task3.IsCompleted);
|
||||
Assert.False(task4.IsCompleted);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public static void RespectsMaxConcurrency()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions {
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions {
|
||||
MaxConcurrentRequests = 2,
|
||||
RequestQueueLimit = 2,
|
||||
}));
|
||||
|
|
@ -67,7 +73,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
[Fact]
|
||||
public static void ExitRequestsPreserveSemaphoreState()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions {
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions {
|
||||
MaxConcurrentRequests = 1,
|
||||
RequestQueueLimit = 2,
|
||||
}));
|
||||
|
|
@ -90,7 +96,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
[Fact]
|
||||
public static void StaleRequestsAreProperlyOverwritten()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions
|
||||
{
|
||||
MaxConcurrentRequests = 0,
|
||||
RequestQueueLimit = 4,
|
||||
|
|
@ -108,7 +114,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests.PolicyTests
|
|||
[Fact]
|
||||
public static async Task OneTryEnterAsyncOneOnExit()
|
||||
{
|
||||
var stack = new LIFOQueuePolicy(Options.Create(new QueuePolicyOptions
|
||||
var stack = new StackPolicy(Options.Create(new QueuePolicyOptions
|
||||
{
|
||||
MaxConcurrentRequests = 1,
|
||||
RequestQueueLimit = 4,
|
||||
|
|
@ -22,15 +22,15 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
return new ConcurrencyLimiterMiddleware(
|
||||
next: next ?? (context => Task.CompletedTask),
|
||||
loggerFactory: NullLoggerFactory.Instance,
|
||||
queue: queue ?? CreateTailDropQueue(1, 0),
|
||||
queue: queue ?? CreateQueuePolicy(1, 0),
|
||||
options: options
|
||||
);
|
||||
}
|
||||
|
||||
public static ConcurrencyLimiterMiddleware CreateTestMiddleware_TailDrop(int maxConcurrentRequests, int requestQueueLimit, RequestDelegate onRejected = null, RequestDelegate next = null)
|
||||
public static ConcurrencyLimiterMiddleware CreateTestMiddleware_QueuePolicy(int maxConcurrentRequests, int requestQueueLimit, RequestDelegate onRejected = null, RequestDelegate next = null)
|
||||
{
|
||||
return CreateTestMiddleware(
|
||||
queue: CreateTailDropQueue(maxConcurrentRequests, requestQueueLimit),
|
||||
queue: CreateQueuePolicy(maxConcurrentRequests, requestQueueLimit),
|
||||
onRejected: onRejected,
|
||||
next: next
|
||||
);
|
||||
|
|
@ -45,7 +45,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
);
|
||||
}
|
||||
|
||||
internal static LIFOQueuePolicy CreateStackPolicy(int maxConcurrentRequests, int requestsQueuelimit = 100)
|
||||
internal static StackPolicy CreateStackPolicy(int maxConcurrentRequests, int requestsQueuelimit = 100)
|
||||
{
|
||||
var options = Options.Create(new QueuePolicyOptions
|
||||
{
|
||||
|
|
@ -53,10 +53,10 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
RequestQueueLimit = requestsQueuelimit
|
||||
});
|
||||
|
||||
return new LIFOQueuePolicy(options);
|
||||
return new StackPolicy(options);
|
||||
}
|
||||
|
||||
internal static FIFOQueuePolicy CreateTailDropQueue(int maxConcurrentRequests, int requestQueueLimit = 100)
|
||||
internal static QueuePolicy CreateQueuePolicy(int maxConcurrentRequests, int requestQueueLimit = 100)
|
||||
{
|
||||
var options = Options.Create(new QueuePolicyOptions
|
||||
{
|
||||
|
|
@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
RequestQueueLimit = requestQueueLimit
|
||||
});
|
||||
|
||||
return new FIFOQueuePolicy(options);
|
||||
return new QueuePolicy(options);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -83,13 +83,10 @@ namespace Microsoft.AspNetCore.ConcurrencyLimiter.Tests
|
|||
}
|
||||
|
||||
public TestQueue(Func<TestQueue, bool> onTryEnter, Action onExit = null) :
|
||||
this(async (state) =>
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
return onTryEnter(state);
|
||||
}, onExit) { }
|
||||
this(state => Task.FromResult(onTryEnter(state))
|
||||
, onExit) { }
|
||||
|
||||
public async Task<bool> TryEnterAsync()
|
||||
public async ValueTask<bool> TryEnterAsync()
|
||||
{
|
||||
Interlocked.Increment(ref _queuedRequests);
|
||||
var result = await _onTryEnter(this);
|
||||
|
|
|
|||
Loading…
Reference in New Issue