Reduce copies for WebSocket reads (#6656)
- Change the upgrade message body to use the transport pipe instead of the request body pipe. - Remove some logic from the base type (more can be removed but this is a conservative change) - Improve performance of the ReadAsync call
This commit is contained in:
parent
cd0eab88ea
commit
3f1760c52b
|
|
@ -3,8 +3,10 @@
|
|||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
|
|
@ -27,6 +29,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
private async Task PumpAsync()
|
||||
{
|
||||
Debug.Assert(!RequestUpgrade, "Upgraded connections should never use this code path!");
|
||||
|
||||
Exception error = null;
|
||||
|
||||
try
|
||||
|
|
@ -81,14 +85,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
// closing the connection without a response as expected.
|
||||
_context.OnInputOrOutputCompleted();
|
||||
|
||||
// Treat any FIN from an upgraded request as expected.
|
||||
// It's up to higher-level consumer (i.e. WebSocket middleware) to determine
|
||||
// if the end is actually expected based on higher-level framing.
|
||||
if (RequestUpgrade)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
BadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
|
||||
}
|
||||
}
|
||||
|
|
@ -291,6 +287,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
return keepAlive ? MessageBody.ZeroContentLengthKeepAlive : MessageBody.ZeroContentLengthClose;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The upgrade stream uses the raw connection stream instead of going through the RequestBodyPipe. This
|
||||
/// removes the redundant copy from the transport pipe to the body pipe.
|
||||
/// </summary>
|
||||
private class ForUpgrade : Http1MessageBody
|
||||
{
|
||||
public ForUpgrade(Http1Connection context)
|
||||
|
|
@ -299,14 +299,87 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
RequestUpgrade = true;
|
||||
}
|
||||
|
||||
// This returns IsEmpty so we can avoid draining the body (since it's basically an endless stream)
|
||||
public override bool IsEmpty => true;
|
||||
|
||||
protected override bool Read(ReadOnlySequence<byte> readableBuffer, PipeWriter writableBuffer, out SequencePosition consumed, out SequencePosition examined)
|
||||
public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
|
||||
{
|
||||
Copy(readableBuffer, writableBuffer);
|
||||
consumed = readableBuffer.End;
|
||||
examined = readableBuffer.End;
|
||||
return false;
|
||||
while (true)
|
||||
{
|
||||
var result = await _context.Input.ReadAsync(cancellationToken);
|
||||
var readableBuffer = result.Buffer;
|
||||
var readableBufferLength = readableBuffer.Length;
|
||||
|
||||
try
|
||||
{
|
||||
if (!readableBuffer.IsEmpty)
|
||||
{
|
||||
foreach (var memory in readableBuffer)
|
||||
{
|
||||
// REVIEW: This *could* be slower if 2 things are true
|
||||
// - The WriteAsync(ReadOnlyMemory<byte>) isn't overridden on the destination
|
||||
// - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory
|
||||
await destination.WriteAsync(memory, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_context.Input.AdvanceTo(readableBuffer.End);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var result = await _context.Input.ReadAsync(cancellationToken);
|
||||
var readableBuffer = result.Buffer;
|
||||
var readableBufferLength = readableBuffer.Length;
|
||||
|
||||
var consumed = readableBuffer.End;
|
||||
var actual = 0;
|
||||
|
||||
try
|
||||
{
|
||||
if (readableBufferLength != 0)
|
||||
{
|
||||
// buffer.Length is int
|
||||
actual = (int)Math.Min(readableBufferLength, buffer.Length);
|
||||
|
||||
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
|
||||
consumed = slice.End;
|
||||
slice.CopyTo(buffer.Span);
|
||||
|
||||
return actual;
|
||||
}
|
||||
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_context.Input.AdvanceTo(consumed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override Task ConsumeAsync()
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public override Task StopAsync()
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue