// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Buffers; using System.IO.Pipelines; using System.Linq; using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared; using Microsoft.AspNetCore.SignalR.Tests; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.DependencyInjection.Extensions; namespace Microsoft.AspNetCore.SignalR.Microbenchmarks { public class HubConnectionReceiveBenchmark { private const string MethodName = "TestMethodName"; private static readonly object _invokeLock = new object(); private HubConnection _hubConnection; private TestDuplexPipe _pipe; private ReadOnlyMemory _invocationMessageBytes; private int _currentInterationMessageCount; private TaskCompletionSource _tcs; private TaskCompletionSource _nextReadTcs; private TaskCompletionSource _waitTcs; [GlobalSetup] public void GlobalSetup() { var arguments = new object[ArgumentCount]; for (var i = 0; i < arguments.Length; i++) { arguments[i] = "Hello world!"; } var writer = MemoryBufferWriter.Get(); try { HandshakeProtocol.WriteResponseMessage(HandshakeResponseMessage.Empty, writer); var handshakeResponseResult = new ReadResult(new ReadOnlySequence(writer.ToArray()), false, false); _pipe = new TestDuplexPipe(); _pipe.AddReadResult(new ValueTask(handshakeResponseResult)); } finally { MemoryBufferWriter.Return(writer); } _nextReadTcs = new TaskCompletionSource(); _pipe.AddReadResult(new ValueTask(_nextReadTcs.Task)); IHubProtocol hubProtocol; var hubConnectionBuilder = new HubConnectionBuilder(); if (Protocol == "json") { hubProtocol = new JsonHubProtocol(); } else { hubProtocol = new MessagePackHubProtocol(); } hubConnectionBuilder.Services.TryAddEnumerable(ServiceDescriptor.Singleton(typeof(IHubProtocol), hubProtocol)); _invocationMessageBytes = hubProtocol.GetMessageBytes(new InvocationMessage(MethodName, arguments)); var delegateConnectionFactory = new DelegateConnectionFactory( format => { var connection = new DefaultConnectionContext(); // prevents keep alive time being activated connection.Features.Set(new TestConnectionInherentKeepAliveFeature()); connection.Transport = _pipe; return Task.FromResult(connection); }, connection => { connection.Transport.Output.Complete(); connection.Transport.Input.Complete(); return Task.CompletedTask; }); hubConnectionBuilder.Services.AddSingleton(delegateConnectionFactory); _hubConnection = hubConnectionBuilder.Build(); _hubConnection.On(MethodName, arguments.Select(v => v.GetType()).ToArray(), OnInvoke); _hubConnection.StartAsync().GetAwaiter().GetResult(); } private Task OnInvoke(object[] args) { // HubConnection now runs this callback serially but just in case // add a lock in case of future experimentation lock (_invokeLock) { _currentInterationMessageCount++; if (_currentInterationMessageCount == MessageCount) { _currentInterationMessageCount = 0; _waitTcs.SetResult(true); } else if (_currentInterationMessageCount > MessageCount) { throw new InvalidOperationException("Should never happen."); } } return Task.CompletedTask; } [Params(0, 1, 10, 100)] public int ArgumentCount; [Params(1, 100)] public int MessageCount; [Params("json", "messagepack")] public string Protocol; [GlobalCleanup] public void GlobalCleanup() { _nextReadTcs.SetResult(new ReadResult(default, false, true)); _hubConnection.StopAsync().GetAwaiter().GetResult(); } public void OperationSetup() { _tcs = _nextReadTcs; // Add the results for additional messages (minus 1 because 1 result has already been added) for (int i = 0; i < MessageCount - 1; i++) { _pipe.AddReadResult(new ValueTask(new ReadResult(new ReadOnlySequence(_invocationMessageBytes), false, false))); } // The receive task that will be waited on once messages are read _nextReadTcs = new TaskCompletionSource(); _pipe.AddReadResult(new ValueTask(_nextReadTcs.Task)); _waitTcs = new TaskCompletionSource(); } [Benchmark] public async Task ReceiveAsync() { // Setup messages OperationSetup(); // Start receive of the next batch of messages _tcs.SetResult(new ReadResult(new ReadOnlySequence(_invocationMessageBytes), false, false)); // Wait for all messages to be read and invoked await _waitTcs.Task.OrTimeout(); } } }