Use resettable awaitable in flow control (#19783)

This commit is contained in:
James Newton-King 2020-03-18 15:27:49 +13:00 committed by GitHub
parent a4af51ee1b
commit 84269f3894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 337 additions and 79 deletions

View File

@ -0,0 +1,92 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Sources;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
internal abstract class AwaitableProvider
{
public abstract ManualResetValueTaskSource<object> GetAwaitable();
public abstract void CompleteCurrent();
public abstract int ActiveCount { get; }
}
/// <summary>
/// Provider returns multiple awaitables. Awaitables are completed FIFO.
/// </summary>
internal class MultipleAwaitableProvider : AwaitableProvider
{
private Queue<ManualResetValueTaskSource<object>> _awaitableQueue;
private Queue<ManualResetValueTaskSource<object>> _awaitableCache;
public override void CompleteCurrent()
{
var awaitable = _awaitableQueue.Dequeue();
awaitable.TrySetResult(null);
// Add completed awaitable to the cache for reuse
_awaitableCache.Enqueue(awaitable);
}
public override ManualResetValueTaskSource<object> GetAwaitable()
{
if (_awaitableQueue == null)
{
_awaitableQueue = new Queue<ManualResetValueTaskSource<object>>();
_awaitableCache = new Queue<ManualResetValueTaskSource<object>>();
}
// First attempt to reuse an existing awaitable in the queue
// to save allocating a new instance.
if (_awaitableCache.TryDequeue(out var awaitable))
{
// Reset previously used awaitable
Debug.Assert(awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed.");
awaitable.Reset();
}
else
{
awaitable = new ManualResetValueTaskSource<object>();
}
_awaitableQueue.Enqueue(awaitable);
return awaitable;
}
public override int ActiveCount => _awaitableQueue?.Count ?? 0;
}
/// <summary>
/// Provider has a single awaitable.
/// </summary>
internal class SingleAwaitableProvider : AwaitableProvider
{
private ManualResetValueTaskSource<object> _awaitable;
public override void CompleteCurrent()
{
_awaitable.TrySetResult(null);
}
public override ManualResetValueTaskSource<object> GetAwaitable()
{
if (_awaitable == null)
{
_awaitable = new ManualResetValueTaskSource<object>();
}
else
{
Debug.Assert(_awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed.");
_awaitable.Reset();
}
return _awaitable;
}
public override int ActiveCount => _awaitable != null && _awaitable.GetStatus() != ValueTaskSourceStatus.Succeeded ? 1 : 0;
}
}

View File

@ -3,37 +3,32 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Sources;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
internal class OutputFlowControl
{
private FlowControl _flow;
private Queue<OutputFlowControlAwaitable> _awaitableQueue;
private readonly AwaitableProvider _awaitableProvider;
public OutputFlowControl(uint initialWindowSize)
public OutputFlowControl(AwaitableProvider awaitableProvider, uint initialWindowSize)
{
_flow = new FlowControl(initialWindowSize);
_awaitableProvider = awaitableProvider;
}
public int Available => _flow.Available;
public bool IsAborted => _flow.IsAborted;
public OutputFlowControlAwaitable AvailabilityAwaitable
public ManualResetValueTaskSource<object> AvailabilityAwaitable
{
get
{
Debug.Assert(!_flow.IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
Debug.Assert(_flow.Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");
if (_awaitableQueue == null)
{
_awaitableQueue = new Queue<OutputFlowControlAwaitable>();
}
var awaitable = new OutputFlowControlAwaitable();
_awaitableQueue.Enqueue(awaitable);
return awaitable;
return _awaitableProvider.GetAwaitable();
}
}
@ -42,7 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
// When output flow control is reused the client window size needs to be reset.
// The client might have changed the window size before the stream is reused.
_flow = new FlowControl(initialWindowSize);
Debug.Assert((_awaitableQueue?.Count ?? 0) == 0, "Queue should have been emptied by the previous stream.");
Debug.Assert(_awaitableProvider.ActiveCount == 0, "Queue should have been emptied by the previous stream.");
}
public void Advance(int bytes)
@ -57,9 +52,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
if (_flow.TryUpdateWindow(bytes))
{
while (_flow.Available > 0 && _awaitableQueue?.Count > 0)
while (_flow.Available > 0 && _awaitableProvider.ActiveCount > 0)
{
_awaitableQueue.Dequeue().Complete();
_awaitableProvider.CompleteCurrent();
}
return true;
@ -73,9 +68,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
// Make sure to set the aborted flag before running any continuations.
_flow.Abort();
while (_awaitableQueue?.Count > 0)
while (_awaitableProvider.ActiveCount > 0)
{
_awaitableQueue.Dequeue().Complete();
_awaitableProvider.CompleteCurrent();
}
}
}

View File

@ -1,49 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
internal class OutputFlowControlAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };
private Action _callback;
public OutputFlowControlAwaitable GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
public void GetResult()
{
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
_callback = null;
}
public void OnCompleted(Action continuation)
{
if (ReferenceEquals(_callback, _callbackCompleted) ||
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
{
Task.Run(continuation);
}
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
public void Complete()
{
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
continuation?.Invoke();
}
}
}

View File

@ -4,6 +4,8 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
@ -12,12 +14,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
private readonly OutputFlowControl _connectionLevelFlowControl;
private readonly OutputFlowControl _streamLevelFlowControl;
private OutputFlowControlAwaitable _currentConnectionLevelAwaitable;
private ManualResetValueTaskSource<object> _currentConnectionLevelAwaitable;
private int _currentConnectionLevelAwaitableVersion;
public StreamOutputFlowControl(OutputFlowControl connectionLevelFlowControl, uint initialWindowSize)
{
_connectionLevelFlowControl = connectionLevelFlowControl;
_streamLevelFlowControl = new OutputFlowControl(initialWindowSize);
_streamLevelFlowControl = new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize);
}
public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available);
@ -29,8 +32,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
_streamLevelFlowControl.Reset(initialWindowSize);
if (_currentConnectionLevelAwaitable != null)
{
Debug.Assert(_currentConnectionLevelAwaitable.IsCompleted, "Should have been completed by the previous stream.");
Debug.Assert(_currentConnectionLevelAwaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Should have been completed by the previous stream.");
_currentConnectionLevelAwaitable = null;
_currentConnectionLevelAwaitableVersion = -1;
}
}
@ -40,7 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
_streamLevelFlowControl.Advance(bytes);
}
public int AdvanceUpToAndWait(long bytes, out OutputFlowControlAwaitable awaitable)
public int AdvanceUpToAndWait(long bytes, out ValueTask<object> availabilityTask)
{
var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available
? _connectionLevelFlowControl : _streamLevelFlowControl;
@ -52,17 +56,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
_connectionLevelFlowControl.Advance(actual);
_streamLevelFlowControl.Advance(actual);
awaitable = null;
availabilityTask = default;
_currentConnectionLevelAwaitable = null;
_currentConnectionLevelAwaitableVersion = -1;
if (actual < bytes)
{
awaitable = leastAvailableFlow.AvailabilityAwaitable;
var awaitable = leastAvailableFlow.AvailabilityAwaitable;
if (leastAvailableFlow == _connectionLevelFlowControl)
{
_currentConnectionLevelAwaitable = awaitable;
_currentConnectionLevelAwaitableVersion = awaitable.Version;
}
availabilityTask = new ValueTask<object>(awaitable, awaitable.Version);
}
return actual;
@ -83,7 +91,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
// connection-level awaitable so the stream abort is observed immediately.
// This could complete an awaitable still sitting in the connection-level awaitable queue,
// but this is safe because completing it again will just no-op.
_currentConnectionLevelAwaitable?.Complete();
if (_currentConnectionLevelAwaitable != null &&
_currentConnectionLevelAwaitable.Version == _currentConnectionLevelAwaitableVersion)
{
_currentConnectionLevelAwaitable.SetResult(null);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]

View File

@ -39,7 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly int _minAllocBufferSize;
private readonly HPackDecoder _hpackDecoder;
private readonly InputFlowControl _inputFlowControl;
private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(new MultipleAwaitableProvider(), Http2PeerSettings.DefaultInitialWindowSize);
private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings();
private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings();

View File

@ -365,7 +365,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
while (dataLength > 0)
{
OutputFlowControlAwaitable availabilityAwaitable;
ValueTask<object> availabilityTask;
var writeTask = default(ValueTask<FlushResult>);
lock (_writeLock)
@ -375,7 +375,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
break;
}
var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable);
var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityTask);
if (actual > 0)
{
@ -405,7 +405,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
// Avoid timing writes that are already complete. This is likely to happen during the last iteration.
if (availabilityAwaitable == null && writeTask.IsCompleted)
if (availabilityTask.IsCompleted && writeTask.IsCompleted)
{
continue;
}
@ -417,9 +417,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
// This awaitable releases continuations in FIFO order when the window updates.
// It should be very rare for a continuation to run without any availability.
if (availabilityAwaitable != null)
if (!availabilityTask.IsCompleted)
{
await availabilityAwaitable;
await availabilityTask;
}
flushResult = await writeTask;

View File

@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
new NullPipeWriter(),
connectionContext: null,
http2Connection: null,
new OutputFlowControl(initialWindowSize: uint.MaxValue),
new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize: uint.MaxValue),
timeoutControl: null,
minResponseDataRate: null,
"TestConnectionId",

View File

@ -26,6 +26,204 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class Http2ConnectionTests : Http2TestBase
{
[Fact]
public async Task FlowControl_ParallelStreams_FirstInFirstOutOrder()
{
var writeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await InitializeConnectionAsync(async c =>
{
// Send headers
await c.Response.Body.FlushAsync();
// Send large data (3 larger than window size)
var writeTask = c.Response.Body.WriteAsync(new byte[65538]);
// Notify test that write has started
writeTcs.SetResult(null);
// Wait for write to complete
await writeTask;
});
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
// Ensure the stream window size is large enough
await SendWindowUpdateAsync(streamId: 1, 65538);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 33,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
await writeTcs.Task;
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16383,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// 3 byte is remaining on stream 1
writeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await StartStreamAsync(3, _browserRequestHeaders, endStream: true);
// Ensure the stream window size is large enough
await SendWindowUpdateAsync(streamId: 3, 65538);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 33,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 3);
await writeTcs.Task;
await SendWindowUpdateAsync(streamId: 0, 1);
// FIFO means stream 1 returns data first
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 0, 1);
// Stream 3 data
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 3);
await SendWindowUpdateAsync(streamId: 0, 1);
// Stream 1 data
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 0, 1);
// Stream 3 data
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 3);
await SendWindowUpdateAsync(streamId: 0, 1);
// Stream 1 data
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// Stream 1 ends
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 0, 1);
// Stream 3 data
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 3);
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
}
[Fact]
public async Task FlowControl_OneStream_CorrectlyAwaited()
{
await InitializeConnectionAsync(async c =>
{
// Send headers
await c.Response.Body.FlushAsync();
// Send large data (1 larger than window size)
await c.Response.Body.WriteAsync(new byte[65540]);
});
// Ensure the connection window size is large enough
await SendWindowUpdateAsync(streamId: 0, 65537);
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 33,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16384,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 16383,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// 2 bytes remaining
await SendWindowUpdateAsync(streamId: 1, 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 1, 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 1, 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 1, 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendWindowUpdateAsync(streamId: 1, 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 1,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
}
[Fact]
public async Task RequestHeaderStringReuse_MultipleStreams_KnownHeaderReused()
{

View File

@ -21,5 +21,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
void IValueTaskSource.GetResult(short token) => _core.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
public ValueTaskSourceStatus GetStatus() => _core.GetStatus(_core.Version);
public void TrySetResult(T result)
{
if (_core.GetStatus(_core.Version) == ValueTaskSourceStatus.Pending)
{
_core.SetResult(result);
}
}
}
}