diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs new file mode 100644 index 0000000000..c765b22824 --- /dev/null +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs @@ -0,0 +1,92 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks.Sources; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl +{ + internal abstract class AwaitableProvider + { + public abstract ManualResetValueTaskSource GetAwaitable(); + public abstract void CompleteCurrent(); + public abstract int ActiveCount { get; } + } + + /// + /// Provider returns multiple awaitables. Awaitables are completed FIFO. + /// + internal class MultipleAwaitableProvider : AwaitableProvider + { + private Queue> _awaitableQueue; + private Queue> _awaitableCache; + + public override void CompleteCurrent() + { + var awaitable = _awaitableQueue.Dequeue(); + awaitable.TrySetResult(null); + + // Add completed awaitable to the cache for reuse + _awaitableCache.Enqueue(awaitable); + } + + public override ManualResetValueTaskSource GetAwaitable() + { + if (_awaitableQueue == null) + { + _awaitableQueue = new Queue>(); + _awaitableCache = new Queue>(); + } + + // First attempt to reuse an existing awaitable in the queue + // to save allocating a new instance. + if (_awaitableCache.TryDequeue(out var awaitable)) + { + // Reset previously used awaitable + Debug.Assert(awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed."); + awaitable.Reset(); + } + else + { + awaitable = new ManualResetValueTaskSource(); + } + + _awaitableQueue.Enqueue(awaitable); + + return awaitable; + } + + public override int ActiveCount => _awaitableQueue?.Count ?? 0; + } + + /// + /// Provider has a single awaitable. + /// + internal class SingleAwaitableProvider : AwaitableProvider + { + private ManualResetValueTaskSource _awaitable; + + public override void CompleteCurrent() + { + _awaitable.TrySetResult(null); + } + + public override ManualResetValueTaskSource GetAwaitable() + { + if (_awaitable == null) + { + _awaitable = new ManualResetValueTaskSource(); + } + else + { + Debug.Assert(_awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed."); + _awaitable.Reset(); + } + + return _awaitable; + } + + public override int ActiveCount => _awaitable != null && _awaitable.GetStatus() != ValueTaskSourceStatus.Succeeded ? 1 : 0; + } +} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs index c5ccd92b1c..198aacb46b 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs @@ -3,37 +3,32 @@ using System.Collections.Generic; using System.Diagnostics; +using System.Threading.Tasks.Sources; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl { internal class OutputFlowControl { private FlowControl _flow; - private Queue _awaitableQueue; + private readonly AwaitableProvider _awaitableProvider; - public OutputFlowControl(uint initialWindowSize) + public OutputFlowControl(AwaitableProvider awaitableProvider, uint initialWindowSize) { _flow = new FlowControl(initialWindowSize); + _awaitableProvider = awaitableProvider; } public int Available => _flow.Available; public bool IsAborted => _flow.IsAborted; - public OutputFlowControlAwaitable AvailabilityAwaitable + public ManualResetValueTaskSource AvailabilityAwaitable { get { Debug.Assert(!_flow.IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort."); Debug.Assert(_flow.Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available."); - if (_awaitableQueue == null) - { - _awaitableQueue = new Queue(); - } - - var awaitable = new OutputFlowControlAwaitable(); - _awaitableQueue.Enqueue(awaitable); - return awaitable; + return _awaitableProvider.GetAwaitable(); } } @@ -42,7 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl // When output flow control is reused the client window size needs to be reset. // The client might have changed the window size before the stream is reused. _flow = new FlowControl(initialWindowSize); - Debug.Assert((_awaitableQueue?.Count ?? 0) == 0, "Queue should have been emptied by the previous stream."); + Debug.Assert(_awaitableProvider.ActiveCount == 0, "Queue should have been emptied by the previous stream."); } public void Advance(int bytes) @@ -57,9 +52,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl { if (_flow.TryUpdateWindow(bytes)) { - while (_flow.Available > 0 && _awaitableQueue?.Count > 0) + while (_flow.Available > 0 && _awaitableProvider.ActiveCount > 0) { - _awaitableQueue.Dequeue().Complete(); + _awaitableProvider.CompleteCurrent(); } return true; @@ -73,9 +68,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl // Make sure to set the aborted flag before running any continuations. _flow.Abort(); - while (_awaitableQueue?.Count > 0) + while (_awaitableProvider.ActiveCount > 0) { - _awaitableQueue.Dequeue().Complete(); + _awaitableProvider.CompleteCurrent(); } } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControlAwaitable.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControlAwaitable.cs deleted file mode 100644 index c691a22ed7..0000000000 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControlAwaitable.cs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl -{ - internal class OutputFlowControlAwaitable : ICriticalNotifyCompletion - { - private static readonly Action _callbackCompleted = () => { }; - - private Action _callback; - - public OutputFlowControlAwaitable GetAwaiter() => this; - public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); - - public void GetResult() - { - Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); - - _callback = null; - } - - public void OnCompleted(Action continuation) - { - if (ReferenceEquals(_callback, _callbackCompleted) || - ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) - { - Task.Run(continuation); - } - } - - public void UnsafeOnCompleted(Action continuation) - { - OnCompleted(continuation); - } - - public void Complete() - { - var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); - - continuation?.Invoke(); - } - } -} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs index 8781f163c6..792a1ca3a7 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs @@ -4,6 +4,8 @@ using System; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl { @@ -12,12 +14,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl private readonly OutputFlowControl _connectionLevelFlowControl; private readonly OutputFlowControl _streamLevelFlowControl; - private OutputFlowControlAwaitable _currentConnectionLevelAwaitable; + private ManualResetValueTaskSource _currentConnectionLevelAwaitable; + private int _currentConnectionLevelAwaitableVersion; public StreamOutputFlowControl(OutputFlowControl connectionLevelFlowControl, uint initialWindowSize) { _connectionLevelFlowControl = connectionLevelFlowControl; - _streamLevelFlowControl = new OutputFlowControl(initialWindowSize); + _streamLevelFlowControl = new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize); } public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available); @@ -29,8 +32,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl _streamLevelFlowControl.Reset(initialWindowSize); if (_currentConnectionLevelAwaitable != null) { - Debug.Assert(_currentConnectionLevelAwaitable.IsCompleted, "Should have been completed by the previous stream."); + Debug.Assert(_currentConnectionLevelAwaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Should have been completed by the previous stream."); _currentConnectionLevelAwaitable = null; + _currentConnectionLevelAwaitableVersion = -1; } } @@ -40,7 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl _streamLevelFlowControl.Advance(bytes); } - public int AdvanceUpToAndWait(long bytes, out OutputFlowControlAwaitable awaitable) + public int AdvanceUpToAndWait(long bytes, out ValueTask availabilityTask) { var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available ? _connectionLevelFlowControl : _streamLevelFlowControl; @@ -52,17 +56,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl _connectionLevelFlowControl.Advance(actual); _streamLevelFlowControl.Advance(actual); - awaitable = null; + availabilityTask = default; _currentConnectionLevelAwaitable = null; + _currentConnectionLevelAwaitableVersion = -1; if (actual < bytes) { - awaitable = leastAvailableFlow.AvailabilityAwaitable; + var awaitable = leastAvailableFlow.AvailabilityAwaitable; if (leastAvailableFlow == _connectionLevelFlowControl) { _currentConnectionLevelAwaitable = awaitable; + _currentConnectionLevelAwaitableVersion = awaitable.Version; } + + availabilityTask = new ValueTask(awaitable, awaitable.Version); } return actual; @@ -83,7 +91,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl // connection-level awaitable so the stream abort is observed immediately. // This could complete an awaitable still sitting in the connection-level awaitable queue, // but this is safe because completing it again will just no-op. - _currentConnectionLevelAwaitable?.Complete(); + if (_currentConnectionLevelAwaitable != null && + _currentConnectionLevelAwaitable.Version == _currentConnectionLevelAwaitableVersion) + { + _currentConnectionLevelAwaitable.SetResult(null); + } } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs index 7db15bb95b..8666bebc64 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs @@ -39,7 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly int _minAllocBufferSize; private readonly HPackDecoder _hpackDecoder; private readonly InputFlowControl _inputFlowControl; - private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize); + private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(new MultipleAwaitableProvider(), Http2PeerSettings.DefaultInitialWindowSize); private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings(); private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings(); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index 2d50e71d5f..fd8d553067 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -365,7 +365,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 while (dataLength > 0) { - OutputFlowControlAwaitable availabilityAwaitable; + ValueTask availabilityTask; var writeTask = default(ValueTask); lock (_writeLock) @@ -375,7 +375,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 break; } - var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable); + var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityTask); if (actual > 0) { @@ -405,7 +405,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } // Avoid timing writes that are already complete. This is likely to happen during the last iteration. - if (availabilityAwaitable == null && writeTask.IsCompleted) + if (availabilityTask.IsCompleted && writeTask.IsCompleted) { continue; } @@ -417,9 +417,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 // This awaitable releases continuations in FIFO order when the window updates. // It should be very rare for a continuation to run without any availability. - if (availabilityAwaitable != null) + if (!availabilityTask.IsCompleted) { - await availabilityAwaitable; + await availabilityTask; } flushResult = await writeTask; diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/Http2FrameWriterBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/Http2FrameWriterBenchmark.cs index fcf5476e93..839558d1a3 100644 --- a/src/Servers/Kestrel/perf/Kestrel.Performance/Http2FrameWriterBenchmark.cs +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/Http2FrameWriterBenchmark.cs @@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance new NullPipeWriter(), connectionContext: null, http2Connection: null, - new OutputFlowControl(initialWindowSize: uint.MaxValue), + new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize: uint.MaxValue), timeoutControl: null, minResponseDataRate: null, "TestConnectionId", diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs index 7e8b5a41be..d8b71f9098 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs @@ -26,6 +26,204 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests { public class Http2ConnectionTests : Http2TestBase { + [Fact] + public async Task FlowControl_ParallelStreams_FirstInFirstOutOrder() + { + var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await InitializeConnectionAsync(async c => + { + // Send headers + await c.Response.Body.FlushAsync(); + + // Send large data (3 larger than window size) + var writeTask = c.Response.Body.WriteAsync(new byte[65538]); + + // Notify test that write has started + writeTcs.SetResult(null); + + // Wait for write to complete + await writeTask; + }); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + // Ensure the stream window size is large enough + await SendWindowUpdateAsync(streamId: 1, 65538); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 33, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + await writeTcs.Task; + + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16383, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + // 3 byte is remaining on stream 1 + + writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await StartStreamAsync(3, _browserRequestHeaders, endStream: true); + // Ensure the stream window size is large enough + await SendWindowUpdateAsync(streamId: 3, 65538); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 33, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 3); + + await writeTcs.Task; + + await SendWindowUpdateAsync(streamId: 0, 1); + + // FIFO means stream 1 returns data first + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 0, 1); + + // Stream 3 data + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 3); + + await SendWindowUpdateAsync(streamId: 0, 1); + + // Stream 1 data + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 0, 1); + + // Stream 3 data + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 3); + + await SendWindowUpdateAsync(streamId: 0, 1); + + // Stream 1 data + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + // Stream 1 ends + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 0, 1); + + // Stream 3 data + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 3); + + await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); + } + + [Fact] + public async Task FlowControl_OneStream_CorrectlyAwaited() + { + await InitializeConnectionAsync(async c => + { + // Send headers + await c.Response.Body.FlushAsync(); + + // Send large data (1 larger than window size) + await c.Response.Body.WriteAsync(new byte[65540]); + }); + + // Ensure the connection window size is large enough + await SendWindowUpdateAsync(streamId: 0, 65537); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 33, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16384, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 16383, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + // 2 bytes remaining + + await SendWindowUpdateAsync(streamId: 1, 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 1, 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 1, 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 1, 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(streamId: 1, 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 1, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + } + [Fact] public async Task RequestHeaderStringReuse_MultipleStreams_KnownHeaderReused() { diff --git a/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs b/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs index f11d2d50bd..97284208e6 100644 --- a/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs +++ b/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs @@ -21,5 +21,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal void IValueTaskSource.GetResult(short token) => _core.GetResult(token); public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags); + + public ValueTaskSourceStatus GetStatus() => _core.GetStatus(_core.Version); + + public void TrySetResult(T result) + { + if (_core.GetStatus(_core.Version) == ValueTaskSourceStatus.Pending) + { + _core.SetResult(result); + } + } } }