diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
index 77f4bfd24e..592eeb67c7 100644
--- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
+++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
@@ -5,6 +5,7 @@ using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
+using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
@@ -15,10 +16,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
///
internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
+ private readonly Pipe _input;
private readonly Pipe _output;
+ private Task _inputTask;
private Task _outputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
+ private readonly int _minAllocBufferSize;
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
@@ -26,10 +30,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) :
- base(duplexPipe.Input, duplexPipe.Output)
+ base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true)
{
Stream = createStream(this);
+ var inputOptions = new PipeOptions(pool: readerOptions.Pool,
+ readerScheduler: PipeScheduler.ThreadPool,
+ writerScheduler: PipeScheduler.Inline,
+ pauseWriterThreshold: 1,
+ resumeWriterThreshold: 1,
+ minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
+ useSynchronizationContext: false);
+
var outputOptions = new PipeOptions(pool: writerOptions.Pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
@@ -38,7 +50,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
minimumSegmentSize: writerOptions.MinimumBufferSize,
useSynchronizationContext: false);
- Input = PipeReader.Create(Stream, readerOptions);
+ _minAllocBufferSize = writerOptions.MinimumBufferSize;
+
+ _input = new Pipe(inputOptions);
// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
@@ -50,7 +64,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public TStream Stream { get; }
- public PipeReader Input { get; }
+ public PipeReader Input
+ {
+ get
+ {
+ if (_inputTask == null)
+ {
+ RunAsync();
+ }
+
+ return _input.Reader;
+ }
+ }
public PipeWriter Output
{
@@ -58,35 +83,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
if (_outputTask == null)
{
- _outputTask = WriteOutputAsync();
+ RunAsync();
}
return _output.Writer;
}
}
- public override ValueTask DisposeAsync()
+ public void RunAsync()
+ {
+ _inputTask = ReadInputAsync();
+ _outputTask = WriteOutputAsync();
+ }
+
+ public override async ValueTask DisposeAsync()
{
lock (_disposeLock)
{
if (_disposed)
{
- return default;
+ return;
}
_disposed = true;
}
- Input.Complete();
_output.Writer.Complete();
+ _input.Reader.Complete();
- if (_outputTask == null || _outputTask.IsCompletedSuccessfully)
+ if (_outputTask == null)
{
- // Wait for the output task to complete, this ensures that we've copied
- // the application data to the underlying stream
- return default;
+ return;
}
- return new ValueTask(_outputTask);
+ if (_outputTask != null)
+ {
+ await _outputTask;
+ }
+
+ CancelPendingRead();
+
+ if (_inputTask != null)
+ {
+ await _inputTask;
+ }
+ }
+
+ private async Task ReadInputAsync()
+ {
+ Exception error = null;
+ try
+ {
+ while (true)
+ {
+ var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize);
+
+ var bytesRead = await Stream.ReadAsync(outputBuffer);
+ _input.Writer.Advance(bytesRead);
+
+ if (bytesRead == 0)
+ {
+ // FIN
+ break;
+ }
+
+ var result = await _input.Writer.FlushAsync();
+
+ if (result.IsCompleted)
+ {
+ // flushResult should not be canceled.
+ break;
+ }
+ }
+
+ }
+ catch (OperationCanceledException ex)
+ {
+ // Propagate the exception if it's ConnectionAbortedException
+ error = ex as ConnectionAbortedException;
+ }
+ catch (Exception ex)
+ {
+ // Don't rethrow the exception. It should be handled by the Pipeline consumer.
+ error = ex;
+ }
+ finally
+ {
+ _input.Writer.Complete(error);
+ }
}
private async Task WriteOutputAsync()