diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs
index 4bfba159dd..3ccdccef69 100644
--- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs
+++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs
@@ -45,6 +45,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly HttpConnectionContext _context;
private readonly Http2FrameWriter _frameWriter;
+ private readonly Pipe _input;
+ private Task _inputTask;
+ private readonly int _minAllocBufferSize;
private readonly HPackDecoder _hpackDecoder;
private readonly InputFlowControl _inputFlowControl;
private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
@@ -91,6 +94,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
context.MemoryPool,
context.ServiceContext.Log);
+ var inputOptions = new PipeOptions(pool: context.MemoryPool,
+ readerScheduler: context.ServiceContext.Scheduler,
+ writerScheduler: PipeScheduler.Inline,
+ pauseWriterThreshold: 1,
+ resumeWriterThreshold: 1,
+ minimumSegmentSize: context.MemoryPool.GetMinimumSegmentSize(),
+ useSynchronizationContext: false);
+
+ _input = new Pipe(inputOptions);
+ _minAllocBufferSize = context.MemoryPool.GetMinimumAllocSize();
+
_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);
var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize;
@@ -101,10 +115,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_serverSettings.HeaderTableSize = (uint)http2Limits.HeaderTableSize;
_serverSettings.MaxHeaderListSize = (uint)httpLimits.MaxRequestHeadersTotalSize;
_serverSettings.InitialWindowSize = (uint)http2Limits.InitialStreamWindowSize;
+ _inputTask = ReadInputAsync();
}
public string ConnectionId => _context.ConnectionId;
- public PipeReader Input => _context.Transport.Input;
+
+ public PipeReader Input => _input.Reader;
+
public IKestrelTrace Log => _context.ServiceContext.Log;
public IFeatureCollection ConnectionFeatures => _context.ConnectionFeatures;
public ISystemClock SystemClock => _context.ServiceContext.SystemClock;
@@ -312,6 +329,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
finally
{
Input.Complete();
+ _context.Transport.Input.CancelPendingRead();
+ await _inputTask;
}
}
}
@@ -1261,6 +1280,55 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
Interlocked.Decrement(ref _clientActiveStreamCount);
}
+ private async Task ReadInputAsync()
+ {
+ Exception error = null;
+ try
+ {
+ while (true)
+ {
+ var reader = _context.Transport.Input;
+ var writer = _input.Writer;
+
+ var readResult = await reader.ReadAsync();
+
+ if ((readResult.IsCompleted && readResult.Buffer.Length == 0) || readResult.IsCanceled)
+ {
+ // FIN
+ break;
+ }
+
+ var outputBuffer = writer.GetMemory(_minAllocBufferSize);
+
+ var copyAmount = (int)Math.Min(outputBuffer.Length, readResult.Buffer.Length);
+ var bufferSlice = readResult.Buffer.Slice(0, copyAmount);
+
+ bufferSlice.CopyTo(outputBuffer.Span);
+
+ reader.AdvanceTo(bufferSlice.End);
+ writer.Advance(copyAmount);
+
+ var result = await writer.FlushAsync();
+
+ if (result.IsCompleted || result.IsCanceled)
+ {
+ // flushResult should not be canceled.
+ break;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ // Don't rethrow the exception. It should be handled by the Pipeline consumer.
+ error = ex;
+ }
+ finally
+ {
+ await _context.Transport.Input.CompleteAsync();
+ _input.Writer.Complete(error);
+ }
+ }
+
private class StreamCloseAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };
diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
index 929446f502..ee310c1e52 100644
--- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
+++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs
@@ -5,7 +5,6 @@ using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
-using Microsoft.AspNetCore.Connections;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
@@ -15,11 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
///
internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
- private readonly Pipe _input;
- private Task _inputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
- private readonly int _minAllocBufferSize;
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
@@ -27,38 +23,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) :
- base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true)
+ base(duplexPipe.Input, duplexPipe.Output)
{
- Stream = createStream(this);
-
- var inputOptions = new PipeOptions(pool: readerOptions.Pool,
- readerScheduler: PipeScheduler.ThreadPool,
- writerScheduler: PipeScheduler.Inline,
- pauseWriterThreshold: 1,
- resumeWriterThreshold: 1,
- minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
- useSynchronizationContext: false);
-
- _minAllocBufferSize = writerOptions.MinimumBufferSize;
-
- _input = new Pipe(inputOptions);
- Output = PipeWriter.Create(Stream, writerOptions);
+ var stream = createStream(this);
+ Stream = stream;
+ Input = PipeReader.Create(stream, readerOptions);
+ Output = PipeWriter.Create(stream, writerOptions);
}
public TStream Stream { get; }
- public PipeReader Input
- {
- get
- {
- if (_inputTask == null)
- {
- _inputTask = ReadInputAsync();
- }
-
- return _input.Reader;
- }
- }
+ public PipeReader Input { get; }
public PipeWriter Output { get; }
@@ -73,65 +48,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_disposed = true;
}
- _input.Reader.Complete();
- Output.Complete();
-
- CancelPendingRead();
-
- if (_inputTask != null)
- {
- await _inputTask;
- }
+ await Input.CompleteAsync();
+ await Output.CompleteAsync();
}
protected override void Dispose(bool disposing)
{
throw new NotSupportedException();
}
-
- private async Task ReadInputAsync()
- {
- Exception error = null;
- try
- {
- while (true)
- {
- var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize);
-
- var bytesRead = await Stream.ReadAsync(outputBuffer);
- _input.Writer.Advance(bytesRead);
-
- if (bytesRead == 0)
- {
- // FIN
- break;
- }
-
- var result = await _input.Writer.FlushAsync();
-
- if (result.IsCompleted)
- {
- // flushResult should not be canceled.
- break;
- }
- }
-
- }
- catch (OperationCanceledException ex)
- {
- // Propagate the exception if it's ConnectionAbortedException
- error = ex as ConnectionAbortedException;
- }
- catch (Exception ex)
- {
- // Don't rethrow the exception. It should be handled by the Pipeline consumer.
- error = ex;
- }
- finally
- {
- _input.Writer.Complete(error);
- }
- }
}
}
diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs
index 90a84bea2e..cb41f2c2e5 100644
--- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs
+++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs
@@ -190,7 +190,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Theory]
- [Repeat(20)]
[InlineData((int)Http2FrameType.DATA)]
[InlineData((int)Http2FrameType.CONTINUATION)]
public async Task AbortedStream_ResetsAndDrainsRequest_RefusesFramesAfterCooldownExpires(int intFinalFrameType)