From cd6e6ae0bc461173601132d0f2b6eb8bf9d05912 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 10 Mar 2020 15:45:24 +1300 Subject: [PATCH] Reuse Http2OutputProducer.ProcessDataWrites task (#19695) --- .../src/Internal/Http2/Http2Connection.cs | 34 ++-- .../src/Internal/Http2/Http2OutputProducer.cs | 163 ++++++++++++------ .../Core/src/Internal/Http2/Http2Stream.cs | 7 +- .../Http2/Http2ConnectionTests.cs | 24 ++- .../ManualResetValueTaskSource.cs | 25 +++ 5 files changed, 184 insertions(+), 69 deletions(-) create mode 100644 src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs index 41d09b162a..7db15bb95b 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs @@ -55,7 +55,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private int _highestOpenedStreamId; private bool _gracefulCloseStarted; - private readonly Dictionary _streams = new Dictionary(); private int _clientActiveStreamCount = 0; private int _serverActiveStreamCount = 0; @@ -66,6 +65,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private int _isClosed; // Internal for testing + internal readonly Dictionary _streams = new Dictionary(); internal Http2StreamStack StreamPool; internal const int InitialStreamPoolSize = 5; @@ -304,6 +304,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 UpdateCompletedStreams(); } + while (StreamPool.TryPop(out var pooledStream)) + { + pooledStream.Dispose(); + } + // This cancels keep-alive and request header timeouts, but not the response drain timeout. TimeoutControl.CancelTimeout(); TimeoutControl.StartDrainTimeout(Limits.MinResponseDataRate, Limits.MaxResponseBufferSize); @@ -909,6 +914,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } catch (Http2StreamErrorException) { + _currentHeadersStream.Dispose(); ResetRequestHeaderParsingState(); throw; } @@ -1069,11 +1075,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 throw new Http2ConnectionErrorException(CoreStrings.FormatHttp2ErrorStreamClosed(_incomingFrame.Type, _incomingFrame.StreamId), Http2ErrorCode.STREAM_CLOSED); } - _streams.Remove(stream.StreamId); - if (stream.CanReuse) - { - ReturnStream(stream); - } + RemoveStream(stream); } else { @@ -1087,6 +1089,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } } + private void RemoveStream(Http2Stream stream) + { + _streams.Remove(stream.StreamId); + if (stream.CanReuse) + { + ReturnStream(stream); + } + else + { + stream.Dispose(); + } + } + // Compare to UpdateCompletedStreams, but only removes streams if over the max stream drain limit. private void MakeSpaceInDrainQueue() { @@ -1099,11 +1114,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _serverActiveStreamCount--; } - _streams.Remove(stream.StreamId); - if (stream.CanReuse) - { - ReturnStream(stream); - } + RemoveStream(stream); } } @@ -1460,7 +1471,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 Unknown = 0x40000000 } - private static class GracefulCloseInitiator { public const int None = 0; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs index d129540342..c1c8a320f5 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs @@ -7,6 +7,7 @@ using System.Diagnostics; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; @@ -17,7 +18,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { - internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter + internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IValueTaskSource, IDisposable { private int StreamId => _stream.StreamId; private readonly Http2FrameWriter _frameWriter; @@ -33,14 +34,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly Pipe _pipe; private readonly ConcurrentPipeWriter _pipeWriter; private readonly PipeReader _pipeReader; - private ValueTask _dataWriteProcessingTask; + private readonly ManualResetValueTaskSource _resetAwaitable = new ManualResetValueTaskSource(); + private IMemoryOwner _fakeMemoryOwner; private bool _startedWritingDataFrames; private bool _streamCompleted; private bool _suffixSent; private bool _streamEnded; private bool _writerComplete; + private bool _disposed; - private IMemoryOwner _fakeMemoryOwner; + // Internal for testing + internal ValueTask _dataWriteProcessingTask; + + /// The core logic for the IValueTaskSource implementation. + private ManualResetValueTaskSourceCore _responseCompleteTaskSource = new ManualResetValueTaskSourceCore { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly + + // This object is itself usable as a backing source for ValueTask. Since there's only ever one awaiter + // for this object's state transitions at a time, we allow the object to be awaited directly. All functionality + // associated with the implementation is just delegated to the ManualResetValueTaskSourceCore. + private ValueTask GetWaiterTask() => new ValueTask(this, _responseCompleteTaskSource.Version); + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _responseCompleteTaskSource.GetStatus(token); + void IValueTaskSource.OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _responseCompleteTaskSource.OnCompleted(continuation, state, token, flags); + FlushResult IValueTaskSource.GetResult(short token) => _responseCompleteTaskSource.GetResult(token); public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, StreamOutputFlowControl flowControl) { @@ -64,7 +79,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public void StreamReset() { - Debug.Assert(_dataWriteProcessingTask.IsCompletedSuccessfully); + // Data background task must still be running. + Debug.Assert(!_dataWriteProcessingTask.IsCompleted); + // Response should have been completed. + Debug.Assert(_responseCompleteTaskSource.GetStatus(_responseCompleteTaskSource.Version) == ValueTaskSourceStatus.Succeeded); _streamEnded = false; _suffixSent = false; @@ -75,8 +93,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _pipe.Reset(); _pipeWriter.Reset(); + _responseCompleteTaskSource.Reset(); - _dataWriteProcessingTask = ProcessDataWrites(); + // Trigger the data process task to resume + _resetAwaitable.SetResult(null); } public void Complete() @@ -222,14 +242,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { if (_streamCompleted) { - return _dataWriteProcessingTask; + return GetWaiterTask(); } _streamCompleted = true; _suffixSent = true; _pipeWriter.Complete(); - return _dataWriteProcessingTask; + return GetWaiterTask(); } } @@ -370,68 +390,90 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { } - private async ValueTask ProcessDataWrites() + private async ValueTask ProcessDataWrites() { - FlushResult flushResult = default; - try + // ProcessDataWrites runs for the lifetime of the Http2OutputProducer, and is designed to be reused by multiple streams. + // When Http2OutputProducer is no longer used (e.g. a stream is aborted and will no longer be used, or the connection is closed) + // it should be disposed so ProcessDataWrites exits. Not disposing won't cause a memory leak in release builds, but in debug + // builds active tasks are rooted on Task.s_currentActiveTasks. Dispose could be removed in the future when active tasks are + // tracked by a weak reference. See https://github.com/dotnet/runtime/issues/26565 + do { - ReadResult readResult; - - do + FlushResult flushResult = default; + ReadResult readResult = default; + try { - readResult = await _pipeReader.ReadAsync(); - if (readResult.IsCanceled) + do { - // Response body is aborted, break and complete reader. - break; - } - else if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0) - { - // Output is ending and there are trailers to write - // Write any remaining content then write trailers - if (readResult.Buffer.Length > 0) + readResult = await _pipeReader.ReadAsync(); + + if (readResult.IsCanceled) { - // Only flush if required (i.e. content length exceeds flow control availability) - // Writing remaining content without flushing allows content and trailers to be sent in the same packet - await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false); + // Response body is aborted, break and complete reader. + break; } - - _stream.ResponseTrailers.SetReadOnly(); - _stream.DecrementActiveClientStreamCount(); - flushResult = await _frameWriter.WriteResponseTrailers(StreamId, _stream.ResponseTrailers); - } - else if (readResult.IsCompleted && _streamEnded) - { - if (readResult.Buffer.Length != 0) + else if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0) { - ThrowUnexpectedState(); - } + // Output is ending and there are trailers to write + // Write any remaining content then write trailers + if (readResult.Buffer.Length > 0) + { + // Only flush if required (i.e. content length exceeds flow control availability) + // Writing remaining content without flushing allows content and trailers to be sent in the same packet + await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false); + } - // Headers have already been written and there is no other content to write - flushResult = await _frameWriter.FlushAsync(outputAborter: null, cancellationToken: default); - } - else - { - var endStream = readResult.IsCompleted; - if (endStream) - { + _stream.ResponseTrailers.SetReadOnly(); _stream.DecrementActiveClientStreamCount(); + flushResult = await _frameWriter.WriteResponseTrailers(StreamId, _stream.ResponseTrailers); } - flushResult = await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream, forceFlush: true); - } + else if (readResult.IsCompleted && _streamEnded) + { + if (readResult.Buffer.Length != 0) + { + ThrowUnexpectedState(); + } - _pipeReader.AdvanceTo(readResult.Buffer.End); - } while (!readResult.IsCompleted); - } - catch (Exception ex) - { - _log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception."); - } + // Headers have already been written and there is no other content to write + flushResult = await _frameWriter.FlushAsync(outputAborter: null, cancellationToken: default); + } + else + { + var endStream = readResult.IsCompleted; + if (endStream) + { + _stream.DecrementActiveClientStreamCount(); + } + flushResult = await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream, forceFlush: true); + } - _pipeReader.Complete(); + _pipeReader.AdvanceTo(readResult.Buffer.End); + } while (!readResult.IsCompleted); + } + catch (Exception ex) + { + _log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception."); + } - return flushResult; + _pipeReader.Complete(); + + // Signal via WriteStreamSuffixAsync to the stream that output has finished. + // Stream state will move to RequestProcessingStatus.ResponseCompleted + _responseCompleteTaskSource.SetResult(flushResult); + + if (readResult.IsCompleted) + { + // Successfully read all data. Wait here for the stream to be reset. + await new ValueTask(_resetAwaitable, _resetAwaitable.Version); + _resetAwaitable.Reset(); + } + else + { + // Stream was aborted. + break; + } + } while (!_disposed); static void ThrowUnexpectedState() { @@ -486,5 +528,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 useSynchronizationContext: false, minimumSegmentSize: pool.GetMinimumSegmentSize() )); + + public void Dispose() + { + if (_disposed) + { + return; + } + _disposed = true; + + _resetAwaitable.SetResult(null); + } } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs index ee97bec12f..b46dc0f0f6 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs @@ -17,7 +17,7 @@ using Microsoft.Net.Http.Headers; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { - internal abstract partial class Http2Stream : HttpProtocol, IThreadPoolWorkItem + internal abstract partial class Http2Stream : HttpProtocol, IThreadPoolWorkItem, IDisposable { private Http2StreamContext _context; private Http2OutputProducer _http2Output; @@ -588,6 +588,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 /// public abstract void Execute(); + public void Dispose() + { + _http2Output.Dispose(); + } + [Flags] private enum StreamCompletionFlags { diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs index b0595d4545..f29cc35df5 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs @@ -76,12 +76,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests [Fact] public async Task StreamPool_SingleStream_ReturnedToPool() { - await InitializeConnectionAsync(_echoApplication); + var serverTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await InitializeConnectionAsync(async context => + { + await serverTcs.Task; + await _echoApplication(context); + }); Assert.Equal(0, _connection.StreamPool.Count); await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + var stream = _connection._streams[1]; + serverTcs.SetResult(null); + await ExpectAsync(Http2FrameType.HEADERS, withLength: 37, withFlags: (byte)(Http2HeadersFrameFlags.END_HEADERS | Http2HeadersFrameFlags.END_STREAM), @@ -98,6 +107,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests Assert.Equal(1, _connection.StreamPool.Count); await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + var output = (Http2OutputProducer)stream.Output; + await output._dataWriteProcessingTask; } [Fact] @@ -216,14 +228,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests [QuarantinedTest] public async Task StreamPool_StreamIsInvalidState_DontReturnedToPool() { + var serverTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await InitializeConnectionAsync(async context => { + await serverTcs.Task; + await context.Response.WriteAsync("Content"); throw new InvalidOperationException("Put the stream into an invalid state by throwing after writing to response."); }); await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + var stream = _connection._streams[1]; + serverTcs.SetResult(null); + await ExpectAsync(Http2FrameType.HEADERS, withLength: 33, withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, @@ -244,6 +263,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests // Stream is not returned to the pool Assert.Equal(0, _connection.StreamPool.Count); + var output = (Http2OutputProducer)stream.Output; + await output._dataWriteProcessingTask; + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); } diff --git a/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs b/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs new file mode 100644 index 0000000000..f11d2d50bd --- /dev/null +++ b/src/Shared/ServerInfrastructure/ManualResetValueTaskSource.cs @@ -0,0 +1,25 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading.Tasks.Sources; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal +{ + internal sealed class ManualResetValueTaskSource : IValueTaskSource, IValueTaskSource + { + private ManualResetValueTaskSourceCore _core; // mutable struct; do not make this readonly + + public bool RunContinuationsAsynchronously { get => _core.RunContinuationsAsynchronously; set => _core.RunContinuationsAsynchronously = value; } + public short Version => _core.Version; + public void Reset() => _core.Reset(); + public void SetResult(T result) => _core.SetResult(result); + public void SetException(Exception error) => _core.SetException(error); + + public T GetResult(short token) => _core.GetResult(token); + void IValueTaskSource.GetResult(short token) => _core.GetResult(token); + public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); + public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags); + } +}