Only use read Pipe when running on Http2. (#12737)

This commit is contained in:
Justin Kotalik 2019-08-02 08:02:59 -07:00 committed by GitHub
parent 8dd1be9474
commit 02d28e1c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 86 deletions

View File

@ -45,6 +45,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly HttpConnectionContext _context;
private readonly Http2FrameWriter _frameWriter;
private readonly Pipe _input;
private Task _inputTask;
private readonly int _minAllocBufferSize;
private readonly HPackDecoder _hpackDecoder;
private readonly InputFlowControl _inputFlowControl;
private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
@ -91,6 +94,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
context.MemoryPool,
context.ServiceContext.Log);
var inputOptions = new PipeOptions(pool: context.MemoryPool,
readerScheduler: context.ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
minimumSegmentSize: context.MemoryPool.GetMinimumSegmentSize(),
useSynchronizationContext: false);
_input = new Pipe(inputOptions);
_minAllocBufferSize = context.MemoryPool.GetMinimumAllocSize();
_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);
var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize;
@ -101,10 +115,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_serverSettings.HeaderTableSize = (uint)http2Limits.HeaderTableSize;
_serverSettings.MaxHeaderListSize = (uint)httpLimits.MaxRequestHeadersTotalSize;
_serverSettings.InitialWindowSize = (uint)http2Limits.InitialStreamWindowSize;
_inputTask = ReadInputAsync();
}
public string ConnectionId => _context.ConnectionId;
public PipeReader Input => _context.Transport.Input;
public PipeReader Input => _input.Reader;
public IKestrelTrace Log => _context.ServiceContext.Log;
public IFeatureCollection ConnectionFeatures => _context.ConnectionFeatures;
public ISystemClock SystemClock => _context.ServiceContext.SystemClock;
@ -312,6 +329,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
finally
{
Input.Complete();
_context.Transport.Input.CancelPendingRead();
await _inputTask;
}
}
}
@ -1261,6 +1280,55 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
Interlocked.Decrement(ref _clientActiveStreamCount);
}
private async Task ReadInputAsync()
{
Exception error = null;
try
{
while (true)
{
var reader = _context.Transport.Input;
var writer = _input.Writer;
var readResult = await reader.ReadAsync();
if ((readResult.IsCompleted && readResult.Buffer.Length == 0) || readResult.IsCanceled)
{
// FIN
break;
}
var outputBuffer = writer.GetMemory(_minAllocBufferSize);
var copyAmount = (int)Math.Min(outputBuffer.Length, readResult.Buffer.Length);
var bufferSlice = readResult.Buffer.Slice(0, copyAmount);
bufferSlice.CopyTo(outputBuffer.Span);
reader.AdvanceTo(bufferSlice.End);
writer.Advance(copyAmount);
var result = await writer.FlushAsync();
if (result.IsCompleted || result.IsCanceled)
{
// flushResult should not be canceled.
break;
}
}
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
await _context.Transport.Input.CompleteAsync();
_input.Writer.Complete(error);
}
}
private class StreamCloseAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };

View File

@ -5,7 +5,6 @@ using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
@ -15,11 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
/// <typeparam name="TStream"></typeparam>
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
private readonly Pipe _input;
private Task _inputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
private readonly int _minAllocBufferSize;
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
@ -27,38 +23,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true)
base(duplexPipe.Input, duplexPipe.Output)
{
Stream = createStream(this);
var inputOptions = new PipeOptions(pool: readerOptions.Pool,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
useSynchronizationContext: false);
_minAllocBufferSize = writerOptions.MinimumBufferSize;
_input = new Pipe(inputOptions);
Output = PipeWriter.Create(Stream, writerOptions);
var stream = createStream(this);
Stream = stream;
Input = PipeReader.Create(stream, readerOptions);
Output = PipeWriter.Create(stream, writerOptions);
}
public TStream Stream { get; }
public PipeReader Input
{
get
{
if (_inputTask == null)
{
_inputTask = ReadInputAsync();
}
return _input.Reader;
}
}
public PipeReader Input { get; }
public PipeWriter Output { get; }
@ -73,65 +48,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_disposed = true;
}
_input.Reader.Complete();
Output.Complete();
CancelPendingRead();
if (_inputTask != null)
{
await _inputTask;
}
await Input.CompleteAsync();
await Output.CompleteAsync();
}
protected override void Dispose(bool disposing)
{
throw new NotSupportedException();
}
private async Task ReadInputAsync()
{
Exception error = null;
try
{
while (true)
{
var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize);
var bytesRead = await Stream.ReadAsync(outputBuffer);
_input.Writer.Advance(bytesRead);
if (bytesRead == 0)
{
// FIN
break;
}
var result = await _input.Writer.FlushAsync();
if (result.IsCompleted)
{
// flushResult should not be canceled.
break;
}
}
}
catch (OperationCanceledException ex)
{
// Propagate the exception if it's ConnectionAbortedException
error = ex as ConnectionAbortedException;
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
_input.Writer.Complete(error);
}
}
}
}

View File

@ -190,7 +190,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Theory]
[Repeat(20)]
[InlineData((int)Http2FrameType.DATA)]
[InlineData((int)Http2FrameType.CONTINUATION)]
public async Task AbortedStream_ResetsAndDrainsRequest_RefusesFramesAfterCooldownExpires(int intFinalFrameType)