From 7f2a97a603b0a83a55a654f49d0290e5eae530a4 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Tue, 16 Jul 2019 07:44:37 -0700 Subject: [PATCH] Revert the input pipe in the DuplexStreamPipeAdapter (#12204) --- .../Internal/DuplexPipeStreamAdapter.cs | 107 ++++++++++++++++-- 1 file changed, 95 insertions(+), 12 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 77f4bfd24e..592eeb67c7 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -5,6 +5,7 @@ using System; using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal @@ -15,10 +16,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private readonly Pipe _input; private readonly Pipe _output; + private Task _inputTask; private Task _outputTask; private bool _disposed; private readonly object _disposeLock = new object(); + private readonly int _minAllocBufferSize; public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) @@ -26,10 +30,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : - base(duplexPipe.Input, duplexPipe.Output) + base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true) { Stream = createStream(this); + var inputOptions = new PipeOptions(pool: readerOptions.Pool, + readerScheduler: PipeScheduler.ThreadPool, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(), + useSynchronizationContext: false); + var outputOptions = new PipeOptions(pool: writerOptions.Pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, @@ -38,7 +50,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal minimumSegmentSize: writerOptions.MinimumBufferSize, useSynchronizationContext: false); - Input = PipeReader.Create(Stream, readerOptions); + _minAllocBufferSize = writerOptions.MinimumBufferSize; + + _input = new Pipe(inputOptions); // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once @@ -50,7 +64,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public TStream Stream { get; } - public PipeReader Input { get; } + public PipeReader Input + { + get + { + if (_inputTask == null) + { + RunAsync(); + } + + return _input.Reader; + } + } public PipeWriter Output { @@ -58,35 +83,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { if (_outputTask == null) { - _outputTask = WriteOutputAsync(); + RunAsync(); } return _output.Writer; } } - public override ValueTask DisposeAsync() + public void RunAsync() + { + _inputTask = ReadInputAsync(); + _outputTask = WriteOutputAsync(); + } + + public override async ValueTask DisposeAsync() { lock (_disposeLock) { if (_disposed) { - return default; + return; } _disposed = true; } - Input.Complete(); _output.Writer.Complete(); + _input.Reader.Complete(); - if (_outputTask == null || _outputTask.IsCompletedSuccessfully) + if (_outputTask == null) { - // Wait for the output task to complete, this ensures that we've copied - // the application data to the underlying stream - return default; + return; } - return new ValueTask(_outputTask); + if (_outputTask != null) + { + await _outputTask; + } + + CancelPendingRead(); + + if (_inputTask != null) + { + await _inputTask; + } + } + + private async Task ReadInputAsync() + { + Exception error = null; + try + { + while (true) + { + var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize); + + var bytesRead = await Stream.ReadAsync(outputBuffer); + _input.Writer.Advance(bytesRead); + + if (bytesRead == 0) + { + // FIN + break; + } + + var result = await _input.Writer.FlushAsync(); + + if (result.IsCompleted) + { + // flushResult should not be canceled. + break; + } + } + + } + catch (OperationCanceledException ex) + { + // Propagate the exception if it's ConnectionAbortedException + error = ex as ConnectionAbortedException; + } + catch (Exception ex) + { + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; + } + finally + { + _input.Writer.Complete(error); + } } private async Task WriteOutputAsync()