Allocate pipe writer stream per connection (#1885)

- Don't create the PipeWriterStream per operation, make it per Connection
- Reduce the buffer size for CopyToAsync operations to 4K where possible instead of 81K (the default)
This commit is contained in:
David Fowler 2018-04-06 14:36:35 -07:00 committed by GitHub
parent e51676fb47
commit f632330d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 14 deletions

View File

@ -85,5 +85,10 @@ namespace System.IO.Pipelines
async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask) => await flushTask;
}
public void Reset()
{
_length = 0;
}
}
}

View File

@ -108,6 +108,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
Log.StartReceive(_logger);
// Allocate this once for the duration of the transport so we can continuously write to it
var applicationStream = new PipeWriterStream(_application.Output);
try
{
while (!cancellationToken.IsCancellationRequested)
@ -143,8 +146,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
Log.ReceivedMessages(_logger);
var stream = new PipeWriterStream(_application.Output);
await response.Content.CopyToAsync(stream);
await response.Content.CopyToAsync(applicationStream);
var flushResult = await _application.Output.FlushAsync();
// We canceled in the middle of applying back pressure

View File

@ -30,9 +30,7 @@ namespace System.IO.Pipelines
{
try
{
// REVIEW: Should we use the default buffer size here?
// 81920 is the default bufferSize, there is no stream.CopyToAsync overload that takes only a cancellationToken
await stream.CopyToAsync(new PipeWriterStream(pipe.Writer), bufferSize: 81920, cancellationToken);
await stream.CopyToAsync(new PipeWriterStream(pipe.Writer), bufferSize: 4096, cancellationToken);
}
catch (OperationCanceledException)
{

View File

@ -30,6 +30,8 @@ namespace Microsoft.AspNetCore.Http.Connections
private readonly object _heartbeatLock = new object();
private List<(Action<object> handler, object state)> _heartbeatHandlers;
private readonly ILogger _logger;
private PipeWriterStream _applicationStream;
private IDuplexPipe _application;
// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
// on the same task
@ -93,7 +95,27 @@ namespace Microsoft.AspNetCore.Http.Connections
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems(new ConcurrentDictionary<object, object>());
public IDuplexPipe Application { get; set; }
public IDuplexPipe Application
{
get
{
return _application;
}
set
{
if (value != null)
{
_applicationStream = new PipeWriterStream(value.Output);
}
else
{
_applicationStream = null;
}
_application = value;
}
}
internal PipeWriterStream ApplicationStream => _applicationStream;
public override IDuplexPipe Transport { get; set; }

View File

@ -447,7 +447,7 @@ namespace Microsoft.AspNetCore.Http.Connections
return;
}
var pipeWriterStream = new PipeWriterStream(connection.Application.Output);
const int bufferSize = 4096;
// REVIEW: Consider spliting the connection lock into a read lock and a write lock
// Need to think about HttpConnectionContext.DisposeAsync and whether one or both locks would be needed
@ -464,15 +464,18 @@ namespace Microsoft.AspNetCore.Http.Connections
context.Response.ContentType = "text/plain";
return;
}
await context.Request.Body.CopyToAsync(connection.ApplicationStream, bufferSize);
await context.Request.Body.CopyToAsync(pipeWriterStream);
Log.ReceivedBytes(_logger, connection.ApplicationStream.Length);
// Clear the amount of read bytes so logging is accurate
connection.ApplicationStream.Reset();
}
finally
{
connection.Lock.Release();
}
Log.ReceivedBytes(_logger, pipeWriterStream.Length);
}
private async Task<bool> EnsureConnectionStateAsync(HttpConnectionContext connection, HttpContext context, HttpTransportType transportType, HttpTransportType supportedTransports, ConnectionLogScope logScope, HttpConnectionOptions options)

View File

@ -0,0 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.Http.Connections.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]

View File

@ -495,10 +495,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
builder.UseConnectionHandler<TestConnectionHandler>();
var app = builder.Build();
Assert.Equal(0, connection.ApplicationStream.Length);
await dispatcher.ExecuteAsync(context, new HttpConnectionOptions(), app);
Assert.True(connection.Transport.Input.TryRead(out var result));
Assert.Equal("Hello World", Encoding.UTF8.GetString(result.Buffer.ToArray()));
Assert.Equal(0, connection.ApplicationStream.Length);
connection.Transport.Input.AdvanceTo(result.Buffer.End);
}
}

View File

@ -11,10 +11,6 @@
</Content>
</ItemGroup>
<ItemGroup>
<Compile Include="..\..\src\Common\DuplexPipe.cs" Link="DuplexPipe.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Http.Connections\Microsoft.AspNetCore.Http.Connections.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />