Revert the output pipe in the DuplexStreamPipeAdapter (#11601)
* Revert back to copying data to pipes * Replace the output pipe only * Don't complete the connection pipe in Http2FrameWriter - This leads to trunated data in some cases. Instead just yield the middleware so we can be sure no more user code is running (Http1OutputProducer does this as well). There are still cases where a misbeaving application that doesn't properly await writes gets cut off but that will be fixed in the SteamPipeWriter itself. - Updated tests
This commit is contained in:
parent
585b57593c
commit
6de357e7f2
|
|
@ -89,7 +89,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
|
||||
_completed = true;
|
||||
_connectionOutputFlowControl.Abort();
|
||||
_outputWriter.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
|
|||
|
||||
if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate)
|
||||
{
|
||||
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions);
|
||||
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions)
|
||||
{
|
||||
Log = _logger
|
||||
};
|
||||
certificateRequired = false;
|
||||
}
|
||||
else
|
||||
|
|
@ -140,7 +143,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
|
|||
}
|
||||
|
||||
return true;
|
||||
}));
|
||||
}))
|
||||
{
|
||||
Log = _logger
|
||||
};
|
||||
|
||||
certificateRequired = true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
||||
{
|
||||
|
|
@ -14,36 +15,114 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
/// <typeparam name="TStream"></typeparam>
|
||||
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
|
||||
{
|
||||
private readonly Pipe _output;
|
||||
private Task _outputTask;
|
||||
|
||||
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
|
||||
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
|
||||
{
|
||||
}
|
||||
|
||||
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) : base(duplexPipe.Input, duplexPipe.Output)
|
||||
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
|
||||
base(duplexPipe.Input, duplexPipe.Output)
|
||||
{
|
||||
Stream = createStream(this);
|
||||
|
||||
var outputOptions = new PipeOptions(pool: writerOptions.Pool,
|
||||
readerScheduler: PipeScheduler.Inline,
|
||||
writerScheduler: PipeScheduler.Inline,
|
||||
pauseWriterThreshold: 1,
|
||||
resumeWriterThreshold: 1,
|
||||
minimumSegmentSize: writerOptions.MinimumBufferSize,
|
||||
useSynchronizationContext: false);
|
||||
|
||||
Input = PipeReader.Create(Stream, readerOptions);
|
||||
Output = PipeWriter.Create(Stream, writerOptions);
|
||||
|
||||
// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
|
||||
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
|
||||
// those patterns are fixed.
|
||||
_output = new Pipe(outputOptions);
|
||||
}
|
||||
|
||||
public ILogger Log { get; set; }
|
||||
|
||||
public TStream Stream { get; }
|
||||
|
||||
public PipeReader Input { get; }
|
||||
|
||||
public PipeWriter Output { get; }
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
public PipeWriter Output
|
||||
{
|
||||
Input.Complete();
|
||||
Output.Complete();
|
||||
base.Dispose(disposing);
|
||||
get
|
||||
{
|
||||
if (_outputTask == null)
|
||||
{
|
||||
_outputTask = WriteOutputAsync();
|
||||
}
|
||||
|
||||
return _output.Writer;
|
||||
}
|
||||
}
|
||||
|
||||
public override ValueTask DisposeAsync()
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
Input.Complete();
|
||||
Output.Complete();
|
||||
return base.DisposeAsync();
|
||||
_output.Writer.Complete();
|
||||
|
||||
if (_outputTask != null)
|
||||
{
|
||||
// Wait for the output task to complete, this ensures that we've copied
|
||||
// the application data to the underlying stream
|
||||
await _outputTask;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task WriteOutputAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var result = await _output.Reader.ReadAsync();
|
||||
var buffer = result.Buffer;
|
||||
|
||||
try
|
||||
{
|
||||
if (buffer.IsEmpty)
|
||||
{
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
await Stream.FlushAsync();
|
||||
}
|
||||
else if (buffer.IsSingleSegment)
|
||||
{
|
||||
await Stream.WriteAsync(buffer.First);
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var memory in buffer)
|
||||
{
|
||||
await Stream.WriteAsync(memory);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_output.Reader.AdvanceTo(buffer.End);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_output.Reader.Complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
|
||||
base(transport, stream => new LoggingStream(stream, logger))
|
||||
{
|
||||
Log = logger;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -467,7 +467,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
CreateConnection();
|
||||
}
|
||||
|
||||
_connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));
|
||||
var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));
|
||||
|
||||
async Task CompletePipeOnTaskCompletion()
|
||||
{
|
||||
try
|
||||
{
|
||||
await connectionTask;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_pair.Transport.Input.Complete();
|
||||
_pair.Transport.Output.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
_connectionTask = CompletePipeOnTaskCompletion();
|
||||
|
||||
await SendPreambleAsync().ConfigureAwait(false);
|
||||
await SendSettingsAsync();
|
||||
|
|
|
|||
Loading…
Reference in New Issue