Revert the input pipe in the DuplexStreamPipeAdapter (#12204)

This commit is contained in:
Justin Kotalik 2019-07-16 07:44:37 -07:00 committed by GitHub
parent 496249fee9
commit 7f2a97a603
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 95 additions and 12 deletions

View File

@ -5,6 +5,7 @@ using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
@ -15,10 +16,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
/// <typeparam name="TStream"></typeparam>
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
private readonly Pipe _input;
private readonly Pipe _output;
private Task _inputTask;
private Task _outputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
private readonly int _minAllocBufferSize;
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
@ -26,10 +30,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
base(duplexPipe.Input, duplexPipe.Output)
base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true)
{
Stream = createStream(this);
var inputOptions = new PipeOptions(pool: readerOptions.Pool,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
useSynchronizationContext: false);
var outputOptions = new PipeOptions(pool: writerOptions.Pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
@ -38,7 +50,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
minimumSegmentSize: writerOptions.MinimumBufferSize,
useSynchronizationContext: false);
Input = PipeReader.Create(Stream, readerOptions);
_minAllocBufferSize = writerOptions.MinimumBufferSize;
_input = new Pipe(inputOptions);
// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
@ -50,7 +64,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public TStream Stream { get; }
public PipeReader Input { get; }
public PipeReader Input
{
get
{
if (_inputTask == null)
{
RunAsync();
}
return _input.Reader;
}
}
public PipeWriter Output
{
@ -58,35 +83,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
if (_outputTask == null)
{
_outputTask = WriteOutputAsync();
RunAsync();
}
return _output.Writer;
}
}
public override ValueTask DisposeAsync()
public void RunAsync()
{
_inputTask = ReadInputAsync();
_outputTask = WriteOutputAsync();
}
public override async ValueTask DisposeAsync()
{
lock (_disposeLock)
{
if (_disposed)
{
return default;
return;
}
_disposed = true;
}
Input.Complete();
_output.Writer.Complete();
_input.Reader.Complete();
if (_outputTask == null || _outputTask.IsCompletedSuccessfully)
if (_outputTask == null)
{
// Wait for the output task to complete, this ensures that we've copied
// the application data to the underlying stream
return default;
return;
}
return new ValueTask(_outputTask);
if (_outputTask != null)
{
await _outputTask;
}
CancelPendingRead();
if (_inputTask != null)
{
await _inputTask;
}
}
private async Task ReadInputAsync()
{
Exception error = null;
try
{
while (true)
{
var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize);
var bytesRead = await Stream.ReadAsync(outputBuffer);
_input.Writer.Advance(bytesRead);
if (bytesRead == 0)
{
// FIN
break;
}
var result = await _input.Writer.FlushAsync();
if (result.IsCompleted)
{
// flushResult should not be canceled.
break;
}
}
}
catch (OperationCanceledException ex)
{
// Propagate the exception if it's ConnectionAbortedException
error = ex as ConnectionAbortedException;
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
_input.Writer.Complete(error);
}
}
private async Task WriteOutputAsync()