Dylan/event source (#11516)

* Queuing Middleware now supports EventSources. Visibility for: items in queue, duration spent in queue, number of rejected requests.
This commit is contained in:
Dylan Dmitri Gray 2019-06-28 15:20:50 -07:00 committed by GitHub
parent 81b757afcc
commit 3cb414afdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 433 additions and 122 deletions

View File

@ -5,7 +5,6 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.Tracing; using System.Diagnostics.Tracing;
using System.Threading; using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Internal;
@ -185,7 +184,7 @@ namespace Microsoft.AspNetCore.Hosting
public async Task VerifyCountersFireWithCorrectValues() public async Task VerifyCountersFireWithCorrectValues()
{ {
// Arrange // Arrange
var eventListener = new CounterListener(new[] { var eventListener = new TestCounterListener(new[] {
"requests-per-second", "requests-per-second",
"total-requests", "total-requests",
"current-requests", "current-requests",
@ -207,6 +206,7 @@ namespace Microsoft.AspNetCore.Hosting
{ "EventCounterIntervalSec", "1" } { "EventCounterIntervalSec", "1" }
}); });
// Act & Assert
hostingEventSource.RequestStart("GET", "/"); hostingEventSource.RequestStart("GET", "/");
Assert.Equal(1, await totalRequestValues.FirstOrDefault(v => v == 1)); Assert.Equal(1, await totalRequestValues.FirstOrDefault(v => v == 1));
@ -241,36 +241,5 @@ namespace Microsoft.AspNetCore.Hosting
{ {
return new HostingEventSource(Guid.NewGuid().ToString()); return new HostingEventSource(Guid.NewGuid().ToString());
} }
private class CounterListener : EventListener
{
private readonly Dictionary<string, Channel<double>> _counters = new Dictionary<string, Channel<double>>();
public CounterListener(string[] counterNames)
{
foreach (var item in counterNames)
{
_counters[item] = Channel.CreateUnbounded<double>();
}
}
public IAsyncEnumerable<double> GetCounterValues(string counterName, CancellationToken cancellationToken = default)
{
return _counters[counterName].Reader.ReadAllAsync(cancellationToken);
}
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
if (eventData.EventName == "EventCounters")
{
var payload = (IDictionary<string, object>)eventData.Payload[0];
var counter = (string)payload["Name"];
payload.TryGetValue("Increment", out var increment);
payload.TryGetValue("Mean", out var mean);
var writer = _counters[counter].Writer;
writer.TryWrite((double)(increment ?? mean));
}
}
}
} }
} }

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework> <TargetFramework>netcoreapp3.0</TargetFramework>
@ -7,6 +7,7 @@
<ItemGroup> <ItemGroup>
<Compile Include="$(SharedSourceRoot)test\SkipOnHelixAttribute.cs" /> <Compile Include="$(SharedSourceRoot)test\SkipOnHelixAttribute.cs" />
<Compile Include="$(SharedSourceRoot)EventSource.Testing\TestEventListener.cs" /> <Compile Include="$(SharedSourceRoot)EventSource.Testing\TestEventListener.cs" />
<Compile Include="$(SharedSourceRoot)EventSource.Testing\TestCounterListener.cs" />
<Content Include="testroot\**\*" CopyToOutputDirectory="PreserveNewest" CopyToPublishDirectory="PreserveNewest" /> <Content Include="testroot\**\*" CopyToOutputDirectory="PreserveNewest" CopyToPublishDirectory="PreserveNewest" />
<Content Include="Microsoft.AspNetCore.Hosting.StaticWebAssets.xml" CopyToOutputDirectory="PreserveNewest" /> <Content Include="Microsoft.AspNetCore.Hosting.StaticWebAssets.xml" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup> </ItemGroup>

View File

@ -8,5 +8,6 @@
<Reference Include="Microsoft.AspNetCore.Http.Abstractions" /> <Reference Include="Microsoft.AspNetCore.Http.Abstractions" />
<Reference Include="Microsoft.Extensions.Logging.Abstractions" /> <Reference Include="Microsoft.Extensions.Logging.Abstractions" />
<Reference Include="Microsoft.Extensions.Options" /> <Reference Include="Microsoft.Extensions.Options" />
<Reference Include="Microsoft.Extensions.ValueStopwatch.Sources" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -10,15 +10,9 @@ namespace Microsoft.AspNetCore.Builder
} }
namespace Microsoft.AspNetCore.RequestThrottling namespace Microsoft.AspNetCore.RequestThrottling
{ {
public partial interface IQueuePolicy
{
void OnExit();
System.Threading.Tasks.Task<bool> TryEnterAsync();
}
public partial class RequestThrottlingMiddleware public partial class RequestThrottlingMiddleware
{ {
public RequestThrottlingMiddleware(Microsoft.AspNetCore.Http.RequestDelegate next, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, Microsoft.AspNetCore.RequestThrottling.IQueuePolicy queue, Microsoft.Extensions.Options.IOptions<Microsoft.AspNetCore.RequestThrottling.RequestThrottlingOptions> options) { } public RequestThrottlingMiddleware(Microsoft.AspNetCore.Http.RequestDelegate next, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, Microsoft.AspNetCore.RequestThrottling.QueuePolicies.IQueuePolicy queue, Microsoft.Extensions.Options.IOptions<Microsoft.AspNetCore.RequestThrottling.RequestThrottlingOptions> options) { }
public int QueuedRequestCount { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute] [System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task Invoke(Microsoft.AspNetCore.Http.HttpContext context) { throw null; } public System.Threading.Tasks.Task Invoke(Microsoft.AspNetCore.Http.HttpContext context) { throw null; }
} }
@ -30,6 +24,11 @@ namespace Microsoft.AspNetCore.RequestThrottling
} }
namespace Microsoft.AspNetCore.RequestThrottling.QueuePolicies namespace Microsoft.AspNetCore.RequestThrottling.QueuePolicies
{ {
public partial interface IQueuePolicy
{
void OnExit();
System.Threading.Tasks.Task<bool> TryEnterAsync();
}
public partial class QueuePolicyOptions public partial class QueuePolicyOptions
{ {
public QueuePolicyOptions() { } public QueuePolicyOptions() { }

View File

@ -22,9 +22,9 @@ namespace RequestThrottlingSample
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddTailDropQueue((options) => services.AddStackQueue((options) =>
{ {
options.MaxConcurrentRequests = Math.Max(1, _config.GetValue<int>("maxCores")); options.MaxConcurrentRequests = Math.Max(1, _config.GetValue<int>("maxConcurrent"));
options.RequestQueueLimit = Math.Max(1, _config.GetValue<int>("maxQueue")); options.RequestQueueLimit = Math.Max(1, _config.GetValue<int>("maxQueue"));
}); });

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<Description>ASP.NET Core middleware for queuing incoming HTTP requests, to avoid threadpool starvation.</Description> <Description>ASP.NET Core middleware for queuing incoming HTTP requests, to avoid threadpool starvation.</Description>
@ -12,6 +12,7 @@
<Reference Include="Microsoft.AspNetCore.Http.Abstractions" /> <Reference Include="Microsoft.AspNetCore.Http.Abstractions" />
<Reference Include="Microsoft.Extensions.Logging.Abstractions" /> <Reference Include="Microsoft.Extensions.Logging.Abstractions" />
<Reference Include="Microsoft.Extensions.Options" /> <Reference Include="Microsoft.Extensions.Options" />
<Reference Include="Microsoft.Extensions.ValueStopwatch.Sources" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -4,7 +4,7 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Microsoft.AspNetCore.RequestThrottling namespace Microsoft.AspNetCore.RequestThrottling.QueuePolicies
{ {
/// <summary> /// <summary>
/// Queueing policies, meant to be used with the <see cref="RequestThrottlingMiddleware"></see>. /// Queueing policies, meant to be used with the <see cref="RequestThrottlingMiddleware"></see>.

View File

@ -0,0 +1,109 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Text;
using System.Threading;
using Microsoft.Extensions.Internal;
namespace Microsoft.AspNetCore.RequestThrottling
{
internal sealed class RequestThrottlingEventSource : EventSource
{
public static readonly RequestThrottlingEventSource Log = new RequestThrottlingEventSource();
private static QueueFrame CachedNonTimerResult = new QueueFrame(null, Log);
private PollingCounter _rejectedRequestsCounter;
private PollingCounter _queueLengthCounter;
private EventCounter _queueDuration;
private long _rejectedRequests;
private int _queueLength;
internal RequestThrottlingEventSource()
: base("Microsoft.AspNetCore.RequestThrottling")
{
}
// Used for testing
internal RequestThrottlingEventSource(string eventSourceName)
: base(eventSourceName)
{
}
[Event(1, Level = EventLevel.Warning)]
public void RequestRejected()
{
Interlocked.Increment(ref _rejectedRequests);
WriteEvent(1);
}
[NonEvent]
public void QueueSkipped()
{
if (IsEnabled())
{
_queueDuration.WriteMetric(0);
}
}
[NonEvent]
public QueueFrame QueueTimer()
{
Interlocked.Increment(ref _queueLength);
if (IsEnabled())
{
return new QueueFrame(ValueStopwatch.StartNew(), this);
}
return CachedNonTimerResult;
}
internal struct QueueFrame : IDisposable
{
private ValueStopwatch? _timer;
private RequestThrottlingEventSource _parent;
public QueueFrame(ValueStopwatch? timer, RequestThrottlingEventSource parent)
{
_timer = timer;
_parent = parent;
}
public void Dispose()
{
Interlocked.Decrement(ref _parent._queueLength);
if (_parent.IsEnabled() && _timer != null)
{
var duration = _timer.Value.GetElapsedTime().TotalMilliseconds;
_parent._queueDuration.WriteMetric(duration);
}
}
}
protected override void OnEventCommand(EventCommandEventArgs command)
{
if (command.Command == EventCommand.Enable)
{
_rejectedRequestsCounter ??= new PollingCounter("requests-rejected", this, () => _rejectedRequests)
{
DisplayName = "Rejected Requests",
};
_queueLengthCounter ??= new PollingCounter("queue-length", this, () => _queueLength)
{
DisplayName = "Queue Length",
};
_queueDuration ??= new EventCounter("queue-duration", this)
{
DisplayName = "Average Time in Queue",
};
}
}
}
}

View File

@ -2,9 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.RequestThrottling.QueuePolicies;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -20,8 +20,6 @@ namespace Microsoft.AspNetCore.RequestThrottling
private readonly RequestDelegate _onRejected; private readonly RequestDelegate _onRejected;
private readonly ILogger _logger; private readonly ILogger _logger;
private int _queuedRequests;
/// <summary> /// <summary>
/// Creates a new <see cref="RequestThrottlingMiddleware"/>. /// Creates a new <see cref="RequestThrottlingMiddleware"/>.
/// </summary> /// </summary>
@ -49,19 +47,21 @@ namespace Microsoft.AspNetCore.RequestThrottling
/// <returns>A <see cref="Task"/> that completes when the request leaves.</returns> /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
public async Task Invoke(HttpContext context) public async Task Invoke(HttpContext context)
{ {
Interlocked.Increment(ref _queuedRequests); var waitInQueueTask = _queuePolicy.TryEnterAsync();
var success = false; if (waitInQueueTask.IsCompleted)
try
{ {
success = await _queuePolicy.TryEnterAsync(); RequestThrottlingEventSource.Log.QueueSkipped();
} }
finally else
{ {
Interlocked.Decrement(ref _queuedRequests); using (RequestThrottlingEventSource.Log.QueueTimer())
{
await waitInQueueTask;
}
} }
if (success) if (waitInQueueTask.Result)
{ {
try try
{ {
@ -74,20 +74,13 @@ namespace Microsoft.AspNetCore.RequestThrottling
} }
else else
{ {
RequestThrottlingEventSource.Log.RequestRejected();
RequestThrottlingLog.RequestRejectedQueueFull(_logger); RequestThrottlingLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context); await _onRejected(context);
} }
} }
/// <summary>
/// The total number of requests waiting within the middleware
/// </summary>
public int QueuedRequestCount
{
get => _queuedRequests;
}
private static class RequestThrottlingLog private static class RequestThrottlingLog
{ {
private static readonly Action<ILogger, int, Exception> _requestEnqueued = private static readonly Action<ILogger, int, Exception> _requestEnqueued =

View File

@ -5,7 +5,8 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Compile Include="$(SharedSourceRoot)SyncPoint\SyncPoint.cs" /> <Compile Include="$(SharedSourceRoot)EventSource.Testing\TestCounterListener.cs" />
<Compile Include="$(SharedSourceRoot)EventSource.Testing\TestEventListener.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -2,8 +2,11 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.RequestThrottling.QueuePolicies;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Xunit; using Xunit;
namespace Microsoft.AspNetCore.RequestThrottling.Tests namespace Microsoft.AspNetCore.RequestThrottling.Tests
@ -16,8 +19,8 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
var flag = false; var flag = false;
var middleware = TestUtils.CreateTestMiddleware( var middleware = TestUtils.CreateTestMiddleware(
queue: TestStrategy.AlwaysPass, queue: TestQueue.AlwaysTrue,
next: (context) => { next: httpContext => {
flag = true; flag = true;
return Task.CompletedTask; return Task.CompletedTask;
}); });
@ -27,23 +30,12 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
} }
[Fact] [Fact]
public async Task RequestRejectsIfQueueReturnsFalse() public async void RequestRejectsIfQueueReturnsFalse()
{
var middleware = TestUtils.CreateTestMiddleware(
queue: TestStrategy.AlwaysReject);
var context = new DefaultHttpContext();
await middleware.Invoke(context);
Assert.Equal(StatusCodes.Status503ServiceUnavailable, context.Response.StatusCode);
}
[Fact]
public async void FullQueueInvokesOnRejected()
{ {
bool onRejectedInvoked = false; bool onRejectedInvoked = false;
var middleware = TestUtils.CreateTestMiddleware( var middleware = TestUtils.CreateTestMiddleware(
queue: TestStrategy.AlwaysReject, queue: TestQueue.AlwaysFalse,
onRejected: httpContext => onRejected: httpContext =>
{ {
onRejectedInvoked = true; onRejectedInvoked = true;
@ -57,14 +49,14 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
} }
[Fact] [Fact]
public async void RequestsBlockedIfQueueFull() public async void RequestsDoesNotEnterIfQueueFull()
{ {
var middleware = TestUtils.CreateTestMiddleware( var middleware = TestUtils.CreateTestMiddleware(
queue: TestStrategy.AlwaysReject, queue: TestQueue.AlwaysFalse,
next: httpContext => next: httpContext =>
{ {
// throttle should bounce the request; it should never get here // throttle should bounce the request; it should never get here
throw new NotImplementedException(); throw new DivideByZeroException();
}); });
await middleware.Invoke(new DefaultHttpContext()).OrTimeout(); await middleware.Invoke(new DefaultHttpContext()).OrTimeout();
@ -73,69 +65,118 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
[Fact] [Fact]
public void IncomingRequestsFillUpQueue() public void IncomingRequestsFillUpQueue()
{ {
var middleware = TestUtils.CreateTestMiddleware( var testQueue = TestQueue.AlwaysBlock;
queue: TestStrategy.AlwaysBlock); var middleware = TestUtils.CreateTestMiddleware(testQueue);
Assert.Equal(0, middleware.QueuedRequestCount); Assert.Equal(0, testQueue.QueuedRequests);
_ = middleware.Invoke(new DefaultHttpContext()); _ = middleware.Invoke(new DefaultHttpContext());
Assert.Equal(1, middleware.QueuedRequestCount); Assert.Equal(1, testQueue.QueuedRequests);
_ = middleware.Invoke(new DefaultHttpContext()); _ = middleware.Invoke(new DefaultHttpContext());
Assert.Equal(2, middleware.QueuedRequestCount); Assert.Equal(2, testQueue.QueuedRequests);
} }
[Fact] [Fact]
public async Task CleanupHappensEvenIfNextErrors() public void EventCountersTrackQueuedRequests()
{
var blocker = new TaskCompletionSource<bool>();
var testQueue = new TestQueue(
onTryEnter: async (_) =>
{
return await blocker.Task;
});
var middleware = TestUtils.CreateTestMiddleware(testQueue);
Assert.Equal(0, testQueue.QueuedRequests);
var task1 = middleware.Invoke(new DefaultHttpContext());
Assert.False(task1.IsCompleted);
Assert.Equal(1, testQueue.QueuedRequests);
blocker.SetResult(true);
Assert.Equal(0, testQueue.QueuedRequests);
}
[Fact]
public async Task QueueOnExitCalledEvenIfNextErrors()
{ {
var flag = false; var flag = false;
var testQueue = new TestQueue(
onTryEnter: (_) => true,
onExit: () => { flag = true; });
var middleware = TestUtils.CreateTestMiddleware( var middleware = TestUtils.CreateTestMiddleware(
queue: new TestStrategy( queue: testQueue,
invoke: (() => true),
onExit: () => { flag = true; }),
next: httpContext => next: httpContext =>
{ {
throw new DivideByZeroException(); throw new DivideByZeroException();
}); });
Assert.Equal(0, middleware.QueuedRequestCount); Assert.Equal(0, testQueue.QueuedRequests);
await Assert.ThrowsAsync<DivideByZeroException>(() => middleware.Invoke(new DefaultHttpContext())).OrTimeout(); await Assert.ThrowsAsync<DivideByZeroException>(() => middleware.Invoke(new DefaultHttpContext())).OrTimeout();
Assert.Equal(0, middleware.QueuedRequestCount); Assert.Equal(0, testQueue.QueuedRequests);
Assert.True(flag); Assert.True(flag);
} }
[Fact] [Fact]
public async void ExceptionThrownDuringOnRejected() public async void ExceptionThrownDuringOnRejected()
{ {
TaskCompletionSource<bool> tsc = new TaskCompletionSource<bool>(); TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
var concurrent = 0;
var testQueue = new TestQueue(
onTryEnter: (testQueue) =>
{
if (concurrent > 0)
{
return false;
}
else
{
concurrent++;
return true;
}
},
onExit: () => { concurrent--; });
var middleware = TestUtils.CreateTestMiddleware( var middleware = TestUtils.CreateTestMiddleware(
queue: testQueue,
onRejected: httpContext => onRejected: httpContext =>
{ {
throw new DivideByZeroException(); throw new DivideByZeroException();
}, },
next: httpContext => next: httpContext =>
{ {
return tsc.Task; return tcs.Task;
}); });
// the first request enters the server, and is blocked by the tcs
var firstRequest = middleware.Invoke(new DefaultHttpContext()); var firstRequest = middleware.Invoke(new DefaultHttpContext());
Assert.Equal(1, concurrent);
Assert.Equal(0, testQueue.QueuedRequests);
// the second request is rejected with a 503 error. During the rejection, an error occurs
var context = new DefaultHttpContext(); var context = new DefaultHttpContext();
await Assert.ThrowsAsync<DivideByZeroException>(() => middleware.Invoke(context)).OrTimeout(); await Assert.ThrowsAsync<DivideByZeroException>(() => middleware.Invoke(context)).OrTimeout();
Assert.Equal(StatusCodes.Status503ServiceUnavailable, context.Response.StatusCode); Assert.Equal(StatusCodes.Status503ServiceUnavailable, context.Response.StatusCode);
Assert.Equal(1, concurrent);
Assert.Equal(0, testQueue.QueuedRequests);
tsc.SetResult(true); // the first request is unblocked, and the queue continues functioning as expected
tcs.SetResult(true);
Assert.True(firstRequest.IsCompletedSuccessfully); Assert.True(firstRequest.IsCompletedSuccessfully);
Assert.Equal(0, concurrent);
Assert.Equal(0, testQueue.QueuedRequests);
var thirdRequest = middleware.Invoke(new DefaultHttpContext()); var thirdRequest = middleware.Invoke(new DefaultHttpContext());
Assert.True(thirdRequest.IsCompletedSuccessfully); Assert.True(thirdRequest.IsCompletedSuccessfully);
Assert.Equal(0, concurrent);
Assert.Equal(0, middleware.QueuedRequestCount); Assert.Equal(0, testQueue.QueuedRequests);
} }
} }
} }

View File

@ -6,7 +6,7 @@ using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Internal;
using Xunit; using Xunit;
namespace Microsoft.AspNetCore.RequestThrottling.Tests namespace Microsoft.AspNetCore.RequestThrottling.Tests.PolicyTests
{ {
public class TailDropTests public class TailDropTests
{ {

View File

@ -0,0 +1,148 @@
// Copyright(c) .NET Foundation.All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Internal;
using Xunit;
namespace Microsoft.AspNetCore.RequestThrottling.Tests
{
public class RequestThrottlingEventSourceTests
{
[Fact]
public void MatchesNameAndGuid()
{
var eventSource = new RequestThrottlingEventSource();
Assert.Equal("Microsoft.AspNetCore.RequestThrottling", eventSource.Name);
Assert.Equal(Guid.Parse("436f1cb1-8acc-56c0-86ec-e0832bd696ed"), eventSource.Guid);
}
[Fact]
public void RecordsRequestsRejected()
{
// Arrange
var expectedId = 1;
var eventListener = new TestEventListener(expectedId);
var eventSource = GetRequestThrottlingEventSource();
eventListener.EnableEvents(eventSource, EventLevel.Informational);
// Act
eventSource.RequestRejected();
// Assert
var eventData = eventListener.EventData;
Assert.NotNull(eventData);
Assert.Equal(expectedId, eventData.EventId);
Assert.Equal(EventLevel.Warning, eventData.Level);
Assert.Same(eventSource, eventData.EventSource);
Assert.Null(eventData.Message);
Assert.Empty(eventData.Payload);
}
[Fact]
public async Task TracksQueueLength()
{
// Arrange
using var eventListener = new TestCounterListener(new[] {
"queue-length",
"queue-duration",
"requests-rejected",
});
using var eventSource = GetRequestThrottlingEventSource();
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var lengthValues = eventListener.GetCounterValues("queue-length", timeoutTokenSource.Token).GetAsyncEnumerator();
eventListener.EnableEvents(eventSource, EventLevel.Informational, EventKeywords.None,
new Dictionary<string, string>
{
{"EventCounterIntervalSec", "1" }
});
// Act
eventSource.RequestRejected();
Assert.True(await UntilValueMatches(lengthValues, 0));
using (eventSource.QueueTimer())
{
Assert.True(await UntilValueMatches(lengthValues, 1));
using (eventSource.QueueTimer())
{
Assert.True(await UntilValueMatches(lengthValues, 2));
}
Assert.True(await UntilValueMatches(lengthValues, 1));
}
Assert.True(await UntilValueMatches(lengthValues, 0));
}
[Fact]
public async Task TracksDurationSpentInQueue()
{
// Arrange
using var eventListener = new TestCounterListener(new[] {
"queue-length",
"queue-duration",
"requests-rejected",
});
using var eventSource = GetRequestThrottlingEventSource();
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var durationValues = eventListener.GetCounterValues("queue-duration", timeoutTokenSource.Token).GetAsyncEnumerator();
eventListener.EnableEvents(eventSource, EventLevel.Informational, EventKeywords.None,
new Dictionary<string, string>
{
{"EventCounterIntervalSec", "1" }
});
// Act
Assert.True(await UntilValueMatches(durationValues, 0));
using (eventSource.QueueTimer())
{
Assert.True(await UntilValueMatches(durationValues, 0));
}
// check that something (anything!) has been written
while (await durationValues.MoveNextAsync())
{
if (durationValues.Current > 0)
{
return;
}
}
throw new TimeoutException();
}
private async Task<bool> UntilValueMatches(IAsyncEnumerator<double> enumerator, int value)
{
while (await enumerator.MoveNextAsync())
{
if (enumerator.Current == value)
{
return true;
}
}
return false;
}
private static RequestThrottlingEventSource GetRequestThrottlingEventSource()
{
return new RequestThrottlingEventSource(Guid.NewGuid().ToString());
}
}
}

View File

@ -69,30 +69,33 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
} }
} }
public class TestStrategy : IQueuePolicy internal class TestQueue : IQueuePolicy
{ {
private Func<Task<bool>> _invoke { get; } private Func<TestQueue, Task<bool>> _onTryEnter { get; }
private Action _onExit { get; } private Action _onExit { get; }
public TestStrategy(Func<Task<bool>> invoke, Action onExit = null) private int _queuedRequests;
public int QueuedRequests { get => _queuedRequests; }
public TestQueue(Func<TestQueue, Task<bool>> onTryEnter, Action onExit = null)
{ {
_invoke = invoke; _onTryEnter = onTryEnter;
_onExit = onExit ?? (() => { }); _onExit = onExit ?? (() => { });
} }
public TestStrategy(Func<bool> invoke, Action onExit = null) public TestQueue(Func<TestQueue, bool> onTryEnter, Action onExit = null) :
: this(async () => this(async (state) =>
{ {
await Task.CompletedTask; await Task.CompletedTask;
return invoke(); return onTryEnter(state);
}, }, onExit) { }
onExit)
{ }
public async Task<bool> TryEnterAsync() public async Task<bool> TryEnterAsync()
{ {
await Task.CompletedTask; Interlocked.Increment(ref _queuedRequests);
return await _invoke(); var result = await _onTryEnter(this);
Interlocked.Decrement(ref _queuedRequests);
return result;
} }
public void OnExit() public void OnExit()
@ -100,14 +103,14 @@ namespace Microsoft.AspNetCore.RequestThrottling.Tests
_onExit(); _onExit();
} }
public static TestStrategy AlwaysReject = public static TestQueue AlwaysFalse =
new TestStrategy(() => false); new TestQueue((_) => false);
public static TestStrategy AlwaysPass = public static TestQueue AlwaysTrue =
new TestStrategy(() => true); new TestQueue((_) => true);
public static TestStrategy AlwaysBlock = public static TestQueue AlwaysBlock =
new TestStrategy(async () => new TestQueue(async (_) =>
{ {
await new SemaphoreSlim(0).WaitAsync(); await new SemaphoreSlim(0).WaitAsync();
return false; return false;

View File

@ -0,0 +1,45 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.Internal
{
internal class TestCounterListener : EventListener
{
private readonly Dictionary<string, Channel<double>> _counters = new Dictionary<string, Channel<double>>();
/// <summary>
/// Creates a new TestCounterListener.
/// </summary>
/// <param name="counterNames">The names of ALL counters for the event source. You must name each counter, even if you do not intend to use it.</param>
public TestCounterListener(string[] counterNames)
{
foreach (var item in counterNames)
{
_counters[item] = Channel.CreateUnbounded<double>();
}
}
public IAsyncEnumerable<double> GetCounterValues(string counterName, CancellationToken cancellationToken = default)
{
return _counters[counterName].Reader.ReadAllAsync(cancellationToken);
}
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
if (eventData.EventName == "EventCounters")
{
var payload = (IDictionary<string, object>)eventData.Payload[0];
var counter = (string)payload["Name"];
payload.TryGetValue("Increment", out var increment);
payload.TryGetValue("Mean", out var mean);
var writer = _counters[counter].Writer;
writer.TryWrite((double)(increment ?? mean));
}
}
}
}