// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Buffers; using System.Collections.Generic; using System.IO.Pipelines; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR.Internal; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; namespace Microsoft.AspNetCore.SignalR.Microbenchmarks { public class DefaultHubDispatcherBenchmark { private DefaultHubDispatcher _dispatcher; private HubConnectionContext _connectionContext; [GlobalSetup] public void GlobalSetup() { var serviceCollection = new ServiceCollection(); serviceCollection.AddSignalRCore(); var provider = serviceCollection.BuildServiceProvider(); var serviceScopeFactory = provider.GetService(); _dispatcher = new DefaultHubDispatcher( serviceScopeFactory, new HubContext(new DefaultHubLifetimeManager(NullLogger>.Instance)), enableDetailedErrors: false, new Logger>(NullLoggerFactory.Instance)); var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Application, pair.Transport); var contextOptions = new HubConnectionContextOptions() { KeepAliveInterval = TimeSpan.Zero, }; _connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance); _connectionContext.Protocol = new FakeHubProtocol(); } public class FakeHubProtocol : IHubProtocol { public string Name { get; } public int Version => 1; public int MinorVersion => 0; public TransferFormat TransferFormat { get; } public bool IsVersionSupported(int version) { return true; } public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message) { message = null; return false; } public void WriteMessage(HubMessage message, IBufferWriter output) { } public ReadOnlyMemory GetMessageBytes(HubMessage message) { return HubProtocolExtensions.GetMessageBytes(this, message); } } public class NoErrorHubConnectionContext : HubConnectionContext { public TaskCompletionSource ReceivedCompleted = new TaskCompletionSource(); public NoErrorHubConnectionContext(ConnectionContext connectionContext, HubConnectionContextOptions contextOptions, ILoggerFactory loggerFactory) : base(connectionContext, contextOptions, loggerFactory) { } public override ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken) { if (message is CompletionMessage completionMessage) { ReceivedCompleted.TrySetResult(null); if (!string.IsNullOrEmpty(completionMessage.Error)) { throw new Exception("Error invoking hub method: " + completionMessage.Error); } } return default; } } public class TestHub : Hub { public void Invocation() { } public Task InvocationAsync() { return Task.CompletedTask; } public int InvocationReturnValue() { return 1; } public Task InvocationReturnAsync() { return Task.FromResult(1); } public ValueTask InvocationValueTaskAsync() { return new ValueTask(1); } public ChannelReader StreamChannelReader() { var channel = Channel.CreateUnbounded(); channel.Writer.Complete(); return channel; } public Task> StreamChannelReaderAsync() { var channel = Channel.CreateUnbounded(); channel.Writer.Complete(); return Task.FromResult>(channel); } public ValueTask> StreamChannelReaderValueTaskAsync() { var channel = Channel.CreateUnbounded(); channel.Writer.Complete(); return new ValueTask>(channel); } public ChannelReader StreamChannelReaderCount(int count) { var channel = Channel.CreateUnbounded(); _ = Task.Run(async () => { for (var i = 0; i < count; i++) { await channel.Writer.WriteAsync(i); } channel.Writer.Complete(); }); return channel.Reader; } public async IAsyncEnumerable StreamIAsyncEnumerableCount(int count) { await Task.Yield(); for (var i = 0; i < count; i++) { yield return i; } } public async IAsyncEnumerable StreamIAsyncEnumerableCountCompletedTask(int count) { await Task.CompletedTask; for (var i = 0; i < count; i++) { yield return i; } } public async Task UploadStream(ChannelReader channelReader) { while (await channelReader.WaitToReadAsync()) { while (channelReader.TryRead(out var item)) { } } } public async Task UploadStreamIAsynEnumerable(IAsyncEnumerable stream) { await foreach (var item in stream) { } } } [Benchmark] public Task Invocation() { return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "Invocation", Array.Empty())); } [Benchmark] public Task InvocationAsync() { return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationAsync", Array.Empty())); } [Benchmark] public Task InvocationReturnValue() { return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnValue", Array.Empty())); } [Benchmark] public Task InvocationReturnAsync() { return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnAsync", Array.Empty())); } [Benchmark] public Task InvocationValueTaskAsync() { return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", Array.Empty())); } [Benchmark] public Task StreamChannelReader() { return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReader", Array.Empty())); } [Benchmark] public Task StreamChannelReaderAsync() { return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderAsync", Array.Empty())); } [Benchmark] public Task StreamChannelReaderValueTaskAsync() { return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderValueTaskAsync", Array.Empty())); } [Benchmark] public async Task StreamChannelReaderCount_Zero() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 0 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCount_Zero() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCount", new object[] { 0 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCompletedTaskCount_Zero() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCountCompletedTask", new object[] { 0 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamChannelReaderCount_One() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCount_One() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCount", new object[] { 1 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCompletedTaskCount_One() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCountCompletedTask", new object[] { 1 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamChannelReaderCount_Thousand() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1000 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCount_Thousand() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCount", new object[] { 1000 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task StreamIAsyncEnumerableCompletedTaskCount_Thousand() { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamIAsyncEnumerableCountCompletedTask", new object[] { 1000 })); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task UploadStream_One() { await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty(), streamIds: new string[] { "1" })); await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test")); await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1")); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task UploadStreamIAsyncEnumerable_One() { await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStreamIAsynEnumerable), Array.Empty(), streamIds: new string[] { "1" })); await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test")); await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1")); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task UploadStream_Thousand() { await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty(), streamIds: new string[] { "1" })); for (var i = 0; i < 1000; ++i) { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test")); } await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1")); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } [Benchmark] public async Task UploadStreamIAsyncEnumerable_Thousand() { await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStreamIAsynEnumerable), Array.Empty(), streamIds: new string[] { "1" })); for (var i = 0; i < 1000; ++i) { await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test")); } await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1")); await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task; (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource(); } } }