From cddf46c0cdffc88edc6e784fda6cfa50493b7a4f Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Thu, 29 Mar 2018 20:26:09 +1300 Subject: [PATCH] Make HubConnection write messages directly to the PipeWriter (#1762) --- .../HubConnectionContextBenchmark.cs | 4 +- .../HubConnectionSendBenchmark.cs | 76 +++++++++++++++++++ ...mark.cs => HubConnectionStartBenchmark.cs} | 4 +- .../Shared/TestConnection.cs | 31 ++++++++ .../TestConnectionInherentKeepAliveFeature.cs | 10 +++ .../Shared/TestDuplexPipe.cs | 3 +- .../Shared/TestPipeReader.cs | 6 +- src/Common/PipeWriterStream.cs | 1 - .../HubConnection.cs | 15 ++-- ...soft.AspNetCore.SignalR.Client.Core.csproj | 1 + .../HttpConnection.cs | 12 ++- 11 files changed, 143 insertions(+), 20 deletions(-) create mode 100644 benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionSendBenchmark.cs rename benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/{HubConnectionBenchmark.cs => HubConnectionStartBenchmark.cs} (95%) create mode 100644 benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnection.cs create mode 100644 benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnectionInherentKeepAliveFeature.cs diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionContextBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionContextBenchmark.cs index 55c5e77c6a..b76ba0bb07 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionContextBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionContextBenchmark.cs @@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks [Benchmark] public async Task SuccessHandshakeAsync() { - _pipe.AddReadResult(_handshakeRequestResult); + _pipe.AddReadResult(new ValueTask(_handshakeRequestResult)); await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _successHubProtocolResolver, _userIdProvider); } @@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks [Benchmark] public async Task ErrorHandshakeAsync() { - _pipe.AddReadResult(_handshakeRequestResult); + _pipe.AddReadResult(new ValueTask(_handshakeRequestResult)); await _hubConnectionContext.HandshakeAsync(TimeSpan.FromSeconds(5), _supportedProtocols, _failureHubProtocolResolver, _userIdProvider); } diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionSendBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionSendBenchmark.cs new file mode 100644 index 0000000000..3be5712f02 --- /dev/null +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionSendBenchmark.cs @@ -0,0 +1,76 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.AspNetCore.Connections.Features; +using Microsoft.AspNetCore.SignalR.Client; +using Microsoft.AspNetCore.SignalR.Internal; +using Microsoft.AspNetCore.SignalR.Internal.Protocol; +using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Microsoft.AspNetCore.SignalR.Microbenchmarks +{ + public class HubConnectionSendBenchmark + { + private HubConnection _hubConnection; + private TestDuplexPipe _pipe; + private TaskCompletionSource _tcs; + private object[] _arguments; + + [GlobalSetup] + public void GlobalSetup() + { + var ms = new MemoryBufferWriter(); + HandshakeProtocol.WriteResponseMessage(HandshakeResponseMessage.Empty, ms); + var handshakeResponseResult = new ReadResult(new ReadOnlySequence(ms.ToArray()), false, false); + + _pipe = new TestDuplexPipe(); + _pipe.AddReadResult(new ValueTask(handshakeResponseResult)); + + _tcs = new TaskCompletionSource(); + _pipe.AddReadResult(new ValueTask(_tcs.Task)); + + var connection = new TestConnection(); + // prevents keep alive time being activated + connection.Features.Set(new TestConnectionInherentKeepAliveFeature()); + connection.Transport = _pipe; + + var protocol = Protocol == "json" ? (IHubProtocol)new JsonHubProtocol() : new MessagePackHubProtocol(); + _hubConnection = new HubConnection(() => connection, protocol, new NullLoggerFactory()); + _hubConnection.StartAsync().GetAwaiter().GetResult(); + + _arguments = new object[ArgumentCount]; + for (int i = 0; i < _arguments.Length; i++) + { + _arguments[i] = "Hello world!"; + } + } + + [Params(0, 1, 10, 100)] + public int ArgumentCount; + + [Params("json", "msgpack")] + public string Protocol; + + [GlobalCleanup] + public void GlobalCleanup() + { + _tcs.SetResult(new ReadResult(default, false, true)); + _hubConnection.StopAsync().GetAwaiter().GetResult(); + } + + [Benchmark] + public Task SendAsync() + { + return _hubConnection.SendAsync("Dummy", _arguments); + } + } +} \ No newline at end of file diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionStartBenchmark.cs similarity index 95% rename from benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionBenchmark.cs rename to benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionStartBenchmark.cs index 730142aef7..bf5dcf2215 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubConnectionStartBenchmark.cs @@ -21,7 +21,7 @@ using Microsoft.Extensions.Logging.Abstractions; namespace Microsoft.AspNetCore.SignalR.Microbenchmarks { - public class HubConnectionBenchmark + public class HubConnectionStartBenchmark { private HubConnection _hubConnection; private TestDuplexPipe _pipe; @@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks private void AddHandshakeResponse() { - _pipe.AddReadResult(_handshakeResponseResult); + _pipe.AddReadResult(new ValueTask(_handshakeResponseResult)); } [Benchmark] diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnection.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnection.cs new file mode 100644 index 0000000000..ec141a2be3 --- /dev/null +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnection.cs @@ -0,0 +1,31 @@ +using System; +using System.IO.Pipelines; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Sockets.Client; + +namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared +{ + public class TestConnection : IConnection + { + public Task StartAsync() + { + throw new NotImplementedException(); + } + + public Task StartAsync(TransferFormat transferFormat) + { + return Task.CompletedTask; + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + public IDuplexPipe Transport { get; set; } + + public IFeatureCollection Features { get; } = new FeatureCollection(); + } +} \ No newline at end of file diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnectionInherentKeepAliveFeature.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnectionInherentKeepAliveFeature.cs new file mode 100644 index 0000000000..2b9fc07039 --- /dev/null +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestConnectionInherentKeepAliveFeature.cs @@ -0,0 +1,10 @@ +using System; +using Microsoft.AspNetCore.Connections.Features; + +namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared +{ + public class TestConnectionInherentKeepAliveFeature : IConnectionInherentKeepAliveFeature + { + public TimeSpan KeepAliveInterval { get; } = TimeSpan.Zero; + } +} \ No newline at end of file diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestDuplexPipe.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestDuplexPipe.cs index 805bd9c170..91b8e96d3b 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestDuplexPipe.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestDuplexPipe.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO.Pipelines; +using System.Threading.Tasks; namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared { @@ -19,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared Output = new TestPipeWriter(); } - public void AddReadResult(ReadResult readResult) + public void AddReadResult(ValueTask readResult) { _input.ReadResults.Add(readResult); } diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestPipeReader.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestPipeReader.cs index eaa38767b9..a52ffac545 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestPipeReader.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/Shared/TestPipeReader.cs @@ -11,11 +11,11 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared { public class TestPipeReader : PipeReader { - public List ReadResults { get; } + public List> ReadResults { get; } public TestPipeReader() { - ReadResults = new List(); + ReadResults = new List>(); } public override void AdvanceTo(SequencePosition consumed) @@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared var result = ReadResults[0]; ReadResults.RemoveAt(0); - return new ValueTask(result); + return result; } public override bool TryRead(out ReadResult result) diff --git a/src/Common/PipeWriterStream.cs b/src/Common/PipeWriterStream.cs index df6f85aaa1..ecf2b239ca 100644 --- a/src/Common/PipeWriterStream.cs +++ b/src/Common/PipeWriterStream.cs @@ -32,7 +32,6 @@ namespace System.IO.Pipelines public override void Flush() { - throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs index 6d21f1729b..3351195c26 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs @@ -39,7 +39,6 @@ namespace Microsoft.AspNetCore.SignalR.Client private bool _disposed; // Transient state to a connection - private readonly object _pendingCallsLock = new object(); private ConnectionState _connectionState; public event Action Closed; @@ -343,11 +342,13 @@ namespace Microsoft.AspNetCore.SignalR.Client { AssertConnectionValid(); - var payload = _protocol.WriteToArray(hubMessage); + _protocol.WriteMessage(hubMessage, _connectionState.OutputStream); Log.SendingMessage(_logger, hubMessage); + // REVIEW: If a token is passed in and is cancelled during FlushAsync it seems to break .Complete()... - await WriteAsync(payload, CancellationToken.None); + await _connectionState.Connection.Transport.Output.FlushAsync(); + Log.MessageSent(_logger, hubMessage); } @@ -727,12 +728,6 @@ namespace Microsoft.AspNetCore.SignalR.Client } } - private ValueTask WriteAsync(byte[] payload, CancellationToken cancellationToken = default) - { - AssertConnectionValid(); - return _connectionState.Connection.Transport.Output.WriteAsync(payload, cancellationToken); - } - private void CheckConnectionActive(string methodName) { if (_connectionState == null || _connectionState.Stopping) @@ -828,6 +823,7 @@ namespace Microsoft.AspNetCore.SignalR.Client public IConnection Connection { get; } public Task ReceiveTask { get; set; } public Exception CloseException { get; set; } + public PipeWriterStream OutputStream { get; } public bool Stopping { @@ -839,6 +835,7 @@ namespace Microsoft.AspNetCore.SignalR.Client { _hubConnection = hubConnection; Connection = connection; + OutputStream = new PipeWriterStream(Connection.Transport.Output); } public string GetNextId() => Interlocked.Increment(ref _nextId).ToString(); diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/Microsoft.AspNetCore.SignalR.Client.Core.csproj b/src/Microsoft.AspNetCore.SignalR.Client.Core/Microsoft.AspNetCore.SignalR.Client.Core.csproj index 6e262df8a0..455b7599db 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/Microsoft.AspNetCore.SignalR.Client.Core.csproj +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/Microsoft.AspNetCore.SignalR.Client.Core.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs index 1b94e02235..ecae0d7d3c 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs @@ -296,10 +296,18 @@ namespace Microsoft.AspNetCore.Sockets.Client.Http request.Version = new Version(1, 1); SendUtils.PrepareHttpRequest(request, _httpOptions); - using (var response = await httpClient.SendAsync(request)) + // ResponseHeadersRead instructs SendAsync to return once headers are read + // rather than buffer the entire response. This gives a small perf boost. + // Note that it is important to dispose of the response when doing this to + // avoid leaving the connection open. + using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead)) { response.EnsureSuccessStatusCode(); - var negotiateResponse = NegotiateProtocol.ParseResponse(await response.Content.ReadAsStreamAsync()); + NegotiationResponse negotiateResponse; + using (var responseStream = await response.Content.ReadAsStreamAsync()) + { + negotiateResponse = NegotiateProtocol.ParseResponse(responseStream); + } Log.ConnectionEstablished(_logger, negotiateResponse.ConnectionId); return negotiateResponse; }