diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs b/src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs index eaed7c7be1..1aba7faf4a 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs @@ -3,8 +3,10 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http; @@ -27,6 +29,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private async Task PumpAsync() { + Debug.Assert(!RequestUpgrade, "Upgraded connections should never use this code path!"); + Exception error = null; try @@ -81,14 +85,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // closing the connection without a response as expected. _context.OnInputOrOutputCompleted(); - // Treat any FIN from an upgraded request as expected. - // It's up to higher-level consumer (i.e. WebSocket middleware) to determine - // if the end is actually expected based on higher-level framing. - if (RequestUpgrade) - { - break; - } - BadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent); } } @@ -291,6 +287,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http return keepAlive ? MessageBody.ZeroContentLengthKeepAlive : MessageBody.ZeroContentLengthClose; } + /// + /// The upgrade stream uses the raw connection stream instead of going through the RequestBodyPipe. This + /// removes the redundant copy from the transport pipe to the body pipe. + /// private class ForUpgrade : Http1MessageBody { public ForUpgrade(Http1Connection context) @@ -299,14 +299,87 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http RequestUpgrade = true; } + // This returns IsEmpty so we can avoid draining the body (since it's basically an endless stream) public override bool IsEmpty => true; - protected override bool Read(ReadOnlySequence readableBuffer, PipeWriter writableBuffer, out SequencePosition consumed, out SequencePosition examined) + public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default) { - Copy(readableBuffer, writableBuffer); - consumed = readableBuffer.End; - examined = readableBuffer.End; - return false; + while (true) + { + var result = await _context.Input.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + var readableBufferLength = readableBuffer.Length; + + try + { + if (!readableBuffer.IsEmpty) + { + foreach (var memory in readableBuffer) + { + // REVIEW: This *could* be slower if 2 things are true + // - The WriteAsync(ReadOnlyMemory) isn't overridden on the destination + // - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory + await destination.WriteAsync(memory, cancellationToken); + } + } + + if (result.IsCompleted) + { + return; + } + } + finally + { + _context.Input.AdvanceTo(readableBuffer.End); + } + } + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + while (true) + { + var result = await _context.Input.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + var readableBufferLength = readableBuffer.Length; + + var consumed = readableBuffer.End; + var actual = 0; + + try + { + if (readableBufferLength != 0) + { + // buffer.Length is int + actual = (int)Math.Min(readableBufferLength, buffer.Length); + + var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual); + consumed = slice.End; + slice.CopyTo(buffer.Span); + + return actual; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _context.Input.AdvanceTo(consumed); + } + } + } + + public override Task ConsumeAsync() + { + return Task.CompletedTask; + } + + public override Task StopAsync() + { + return Task.CompletedTask; } }