From 02d28e1c54e1264e713e8c3e0f77589c46f73ecf Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Fri, 2 Aug 2019 08:02:59 -0700 Subject: [PATCH] Only use read Pipe when running on Http2. (#12737) --- .../src/Internal/Http2/Http2Connection.cs | 70 +++++++++++++- .../Internal/DuplexPipeStreamAdapter.cs | 92 ++----------------- .../Http2/Http2TimeoutTests.cs | 1 - 3 files changed, 77 insertions(+), 86 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs index 4bfba159dd..3ccdccef69 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs @@ -45,6 +45,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly HttpConnectionContext _context; private readonly Http2FrameWriter _frameWriter; + private readonly Pipe _input; + private Task _inputTask; + private readonly int _minAllocBufferSize; private readonly HPackDecoder _hpackDecoder; private readonly InputFlowControl _inputFlowControl; private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize); @@ -91,6 +94,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 context.MemoryPool, context.ServiceContext.Log); + var inputOptions = new PipeOptions(pool: context.MemoryPool, + readerScheduler: context.ServiceContext.Scheduler, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: context.MemoryPool.GetMinimumSegmentSize(), + useSynchronizationContext: false); + + _input = new Pipe(inputOptions); + _minAllocBufferSize = context.MemoryPool.GetMinimumAllocSize(); + _hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize); var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize; @@ -101,10 +115,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _serverSettings.HeaderTableSize = (uint)http2Limits.HeaderTableSize; _serverSettings.MaxHeaderListSize = (uint)httpLimits.MaxRequestHeadersTotalSize; _serverSettings.InitialWindowSize = (uint)http2Limits.InitialStreamWindowSize; + _inputTask = ReadInputAsync(); } public string ConnectionId => _context.ConnectionId; - public PipeReader Input => _context.Transport.Input; + + public PipeReader Input => _input.Reader; + public IKestrelTrace Log => _context.ServiceContext.Log; public IFeatureCollection ConnectionFeatures => _context.ConnectionFeatures; public ISystemClock SystemClock => _context.ServiceContext.SystemClock; @@ -312,6 +329,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 finally { Input.Complete(); + _context.Transport.Input.CancelPendingRead(); + await _inputTask; } } } @@ -1261,6 +1280,55 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 Interlocked.Decrement(ref _clientActiveStreamCount); } + private async Task ReadInputAsync() + { + Exception error = null; + try + { + while (true) + { + var reader = _context.Transport.Input; + var writer = _input.Writer; + + var readResult = await reader.ReadAsync(); + + if ((readResult.IsCompleted && readResult.Buffer.Length == 0) || readResult.IsCanceled) + { + // FIN + break; + } + + var outputBuffer = writer.GetMemory(_minAllocBufferSize); + + var copyAmount = (int)Math.Min(outputBuffer.Length, readResult.Buffer.Length); + var bufferSlice = readResult.Buffer.Slice(0, copyAmount); + + bufferSlice.CopyTo(outputBuffer.Span); + + reader.AdvanceTo(bufferSlice.End); + writer.Advance(copyAmount); + + var result = await writer.FlushAsync(); + + if (result.IsCompleted || result.IsCanceled) + { + // flushResult should not be canceled. + break; + } + } + } + catch (Exception ex) + { + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; + } + finally + { + await _context.Transport.Input.CompleteAsync(); + _input.Writer.Complete(error); + } + } + private class StreamCloseAwaitable : ICriticalNotifyCompletion { private static readonly Action _callbackCompleted = () => { }; diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 929446f502..ee310c1e52 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -5,7 +5,6 @@ using System; using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; -using Microsoft.AspNetCore.Connections; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -15,11 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { - private readonly Pipe _input; - private Task _inputTask; private bool _disposed; private readonly object _disposeLock = new object(); - private readonly int _minAllocBufferSize; public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) @@ -27,38 +23,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : - base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true) + base(duplexPipe.Input, duplexPipe.Output) { - Stream = createStream(this); - - var inputOptions = new PipeOptions(pool: readerOptions.Pool, - readerScheduler: PipeScheduler.ThreadPool, - writerScheduler: PipeScheduler.Inline, - pauseWriterThreshold: 1, - resumeWriterThreshold: 1, - minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(), - useSynchronizationContext: false); - - _minAllocBufferSize = writerOptions.MinimumBufferSize; - - _input = new Pipe(inputOptions); - Output = PipeWriter.Create(Stream, writerOptions); + var stream = createStream(this); + Stream = stream; + Input = PipeReader.Create(stream, readerOptions); + Output = PipeWriter.Create(stream, writerOptions); } public TStream Stream { get; } - public PipeReader Input - { - get - { - if (_inputTask == null) - { - _inputTask = ReadInputAsync(); - } - - return _input.Reader; - } - } + public PipeReader Input { get; } public PipeWriter Output { get; } @@ -73,65 +48,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal _disposed = true; } - _input.Reader.Complete(); - Output.Complete(); - - CancelPendingRead(); - - if (_inputTask != null) - { - await _inputTask; - } + await Input.CompleteAsync(); + await Output.CompleteAsync(); } protected override void Dispose(bool disposing) { throw new NotSupportedException(); } - - private async Task ReadInputAsync() - { - Exception error = null; - try - { - while (true) - { - var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize); - - var bytesRead = await Stream.ReadAsync(outputBuffer); - _input.Writer.Advance(bytesRead); - - if (bytesRead == 0) - { - // FIN - break; - } - - var result = await _input.Writer.FlushAsync(); - - if (result.IsCompleted) - { - // flushResult should not be canceled. - break; - } - } - - } - catch (OperationCanceledException ex) - { - // Propagate the exception if it's ConnectionAbortedException - error = ex as ConnectionAbortedException; - } - catch (Exception ex) - { - // Don't rethrow the exception. It should be handled by the Pipeline consumer. - error = ex; - } - finally - { - _input.Writer.Complete(error); - } - } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs index 90a84bea2e..cb41f2c2e5 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs @@ -190,7 +190,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests } [Theory] - [Repeat(20)] [InlineData((int)Http2FrameType.DATA)] [InlineData((int)Http2FrameType.CONTINUATION)] public async Task AbortedStream_ResetsAndDrainsRequest_RefusesFramesAfterCooldownExpires(int intFinalFrameType)