Make HubConnection write messages directly to the PipeWriter (#1762)

This commit is contained in:
James Newton-King 2018-03-29 20:26:09 +13:00 committed by GitHub
parent 7f86b92f7e
commit cddf46c0cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 143 additions and 20 deletions

View File

@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark] [Benchmark]
public async Task SuccessHandshakeAsync() public async Task SuccessHandshakeAsync()
{ {
_pipe.AddReadResult(_handshakeRequestResult); _pipe.AddReadResult(new ValueTask<ReadResult>(_handshakeRequestResult));
await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _successHubProtocolResolver, _userIdProvider); await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _successHubProtocolResolver, _userIdProvider);
} }
@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark] [Benchmark]
public async Task ErrorHandshakeAsync() public async Task ErrorHandshakeAsync()
{ {
_pipe.AddReadResult(_handshakeRequestResult); _pipe.AddReadResult(new ValueTask<ReadResult>(_handshakeRequestResult));
await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _failureHubProtocolResolver, _userIdProvider); await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _failureHubProtocolResolver, _userIdProvider);
} }

View File

@ -0,0 +1,76 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
public class HubConnectionSendBenchmark
{
private HubConnection _hubConnection;
private TestDuplexPipe _pipe;
private TaskCompletionSource<ReadResult> _tcs;
private object[] _arguments;
[GlobalSetup]
public void GlobalSetup()
{
var ms = new MemoryBufferWriter();
HandshakeProtocol.WriteResponseMessage(HandshakeResponseMessage.Empty, ms);
var handshakeResponseResult = new ReadResult(new ReadOnlySequence<byte>(ms.ToArray()), false, false);
_pipe = new TestDuplexPipe();
_pipe.AddReadResult(new ValueTask<ReadResult>(handshakeResponseResult));
_tcs = new TaskCompletionSource<ReadResult>();
_pipe.AddReadResult(new ValueTask<ReadResult>(_tcs.Task));
var connection = new TestConnection();
// prevents keep alive time being activated
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe;
var protocol = Protocol == "json" ? (IHubProtocol)new JsonHubProtocol() : new MessagePackHubProtocol();
_hubConnection = new HubConnection(() => connection, protocol, new NullLoggerFactory());
_hubConnection.StartAsync().GetAwaiter().GetResult();
_arguments = new object[ArgumentCount];
for (int i = 0; i < _arguments.Length; i++)
{
_arguments[i] = "Hello world!";
}
}
[Params(0, 1, 10, 100)]
public int ArgumentCount;
[Params("json", "msgpack")]
public string Protocol;
[GlobalCleanup]
public void GlobalCleanup()
{
_tcs.SetResult(new ReadResult(default, false, true));
_hubConnection.StopAsync().GetAwaiter().GetResult();
}
[Benchmark]
public Task SendAsync()
{
return _hubConnection.SendAsync("Dummy", _arguments);
}
}
}

View File

@ -21,7 +21,7 @@ using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{ {
public class HubConnectionBenchmark public class HubConnectionStartBenchmark
{ {
private HubConnection _hubConnection; private HubConnection _hubConnection;
private TestDuplexPipe _pipe; private TestDuplexPipe _pipe;
@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
private void AddHandshakeResponse() private void AddHandshakeResponse()
{ {
_pipe.AddReadResult(_handshakeResponseResult); _pipe.AddReadResult(new ValueTask<ReadResult>(_handshakeResponseResult));
} }
[Benchmark] [Benchmark]

View File

@ -0,0 +1,31 @@
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Client;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
{
public class TestConnection : IConnection
{
public Task StartAsync()
{
throw new NotImplementedException();
}
public Task StartAsync(TransferFormat transferFormat)
{
return Task.CompletedTask;
}
public Task DisposeAsync()
{
return Task.CompletedTask;
}
public IDuplexPipe Transport { get; set; }
public IFeatureCollection Features { get; } = new FeatureCollection();
}
}

View File

@ -0,0 +1,10 @@
using System;
using Microsoft.AspNetCore.Connections.Features;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
{
public class TestConnectionInherentKeepAliveFeature : IConnectionInherentKeepAliveFeature
{
public TimeSpan KeepAliveInterval { get; } = TimeSpan.Zero;
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
{ {
@ -19,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
Output = new TestPipeWriter(); Output = new TestPipeWriter();
} }
public void AddReadResult(ReadResult readResult) public void AddReadResult(ValueTask<ReadResult> readResult)
{ {
_input.ReadResults.Add(readResult); _input.ReadResults.Add(readResult);
} }

View File

@ -11,11 +11,11 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
{ {
public class TestPipeReader : PipeReader public class TestPipeReader : PipeReader
{ {
public List<ReadResult> ReadResults { get; } public List<ValueTask<ReadResult>> ReadResults { get; }
public TestPipeReader() public TestPipeReader()
{ {
ReadResults = new List<ReadResult>(); ReadResults = new List<ValueTask<ReadResult>>();
} }
public override void AdvanceTo(SequencePosition consumed) public override void AdvanceTo(SequencePosition consumed)
@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
var result = ReadResults[0]; var result = ReadResults[0];
ReadResults.RemoveAt(0); ReadResults.RemoveAt(0);
return new ValueTask<ReadResult>(result); return result;
} }
public override bool TryRead(out ReadResult result) public override bool TryRead(out ReadResult result)

View File

@ -32,7 +32,6 @@ namespace System.IO.Pipelines
public override void Flush() public override void Flush()
{ {
throw new NotSupportedException();
} }
public override int Read(byte[] buffer, int offset, int count) public override int Read(byte[] buffer, int offset, int count)

View File

@ -39,7 +39,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
private bool _disposed; private bool _disposed;
// Transient state to a connection // Transient state to a connection
private readonly object _pendingCallsLock = new object();
private ConnectionState _connectionState; private ConnectionState _connectionState;
public event Action<Exception> Closed; public event Action<Exception> Closed;
@ -343,11 +342,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
{ {
AssertConnectionValid(); AssertConnectionValid();
var payload = _protocol.WriteToArray(hubMessage); _protocol.WriteMessage(hubMessage, _connectionState.OutputStream);
Log.SendingMessage(_logger, hubMessage); Log.SendingMessage(_logger, hubMessage);
// REVIEW: If a token is passed in and is cancelled during FlushAsync it seems to break .Complete()... // REVIEW: If a token is passed in and is cancelled during FlushAsync it seems to break .Complete()...
await WriteAsync(payload, CancellationToken.None); await _connectionState.Connection.Transport.Output.FlushAsync();
Log.MessageSent(_logger, hubMessage); Log.MessageSent(_logger, hubMessage);
} }
@ -727,12 +728,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
} }
} }
private ValueTask<FlushResult> WriteAsync(byte[] payload, CancellationToken cancellationToken = default)
{
AssertConnectionValid();
return _connectionState.Connection.Transport.Output.WriteAsync(payload, cancellationToken);
}
private void CheckConnectionActive(string methodName) private void CheckConnectionActive(string methodName)
{ {
if (_connectionState == null || _connectionState.Stopping) if (_connectionState == null || _connectionState.Stopping)
@ -828,6 +823,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public IConnection Connection { get; } public IConnection Connection { get; }
public Task ReceiveTask { get; set; } public Task ReceiveTask { get; set; }
public Exception CloseException { get; set; } public Exception CloseException { get; set; }
public PipeWriterStream OutputStream { get; }
public bool Stopping public bool Stopping
{ {
@ -839,6 +835,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{ {
_hubConnection = hubConnection; _hubConnection = hubConnection;
Connection = connection; Connection = connection;
OutputStream = new PipeWriterStream(Connection.Transport.Output);
} }
public string GetNextId() => Interlocked.Increment(ref _nextId).ToString(); public string GetNextId() => Interlocked.Increment(ref _nextId).ToString();

View File

@ -8,6 +8,7 @@
<ItemGroup> <ItemGroup>
<Compile Include="..\Common\ForceAsyncAwaiter.cs" Link="ForceAsyncAwaiter.cs" /> <Compile Include="..\Common\ForceAsyncAwaiter.cs" Link="ForceAsyncAwaiter.cs" />
<Compile Include="..\Common\PipeWriterStream.cs" Link="PipeWriterStream.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -296,10 +296,18 @@ namespace Microsoft.AspNetCore.Sockets.Client.Http
request.Version = new Version(1, 1); request.Version = new Version(1, 1);
SendUtils.PrepareHttpRequest(request, _httpOptions); SendUtils.PrepareHttpRequest(request, _httpOptions);
using (var response = await httpClient.SendAsync(request)) // ResponseHeadersRead instructs SendAsync to return once headers are read
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{ {
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
var negotiateResponse = NegotiateProtocol.ParseResponse(await response.Content.ReadAsStreamAsync()); NegotiationResponse negotiateResponse;
using (var responseStream = await response.Content.ReadAsStreamAsync())
{
negotiateResponse = NegotiateProtocol.ParseResponse(responseStream);
}
Log.ConnectionEstablished(_logger, negotiateResponse.ConnectionId); Log.ConnectionEstablished(_logger, negotiateResponse.ConnectionId);
return negotiateResponse; return negotiateResponse;
} }