From ecd665c4717187210750117bfdcce6faaa8cc053 Mon Sep 17 00:00:00 2001 From: Andrew Stanton-Nurse Date: Mon, 9 Apr 2018 15:46:26 -0700 Subject: [PATCH] Crank 2: Let's Get Crankier (#1888) Ports Crank, from https://github.com/SignalR/DCrank to ASP.NET Core SignalR --- SignalR.sln | 9 +- benchmarks/BenchmarkServer/Hubs/EchoHub.cs | 8 +- benchmarks/BenchmarkServer/Program.cs | 5 +- benchmarks/BenchmarkServer/Startup.cs | 2 +- benchmarks/Crankier/Agent.cs | 302 ++++++++++++++++++ .../Crankier/AgentHeartbeatInformation.cs | 20 ++ benchmarks/Crankier/AgentReceiver.cs | 63 ++++ benchmarks/Crankier/AgentSender.cs | 76 +++++ benchmarks/Crankier/AgentWorker.cs | 72 +++++ benchmarks/Crankier/Client.cs | 128 ++++++++ benchmarks/Crankier/Commands/AgentCommand.cs | 17 + .../Crankier/Commands/CommandLineUtilities.cs | 32 ++ benchmarks/Crankier/Commands/Defaults.cs | 15 + benchmarks/Crankier/Commands/LocalCommand.cs | 79 +++++ benchmarks/Crankier/Commands/WorkerCommand.cs | 60 ++++ benchmarks/Crankier/ConnectionState.cs | 14 + benchmarks/Crankier/Crankier.csproj | 18 ++ benchmarks/Crankier/IAgent.cs | 14 + benchmarks/Crankier/IRunner.cs | 16 + benchmarks/Crankier/IWorker.cs | 17 + benchmarks/Crankier/Message.cs | 14 + benchmarks/Crankier/Program.cs | 54 ++++ benchmarks/Crankier/Runner.cs | 104 ++++++ benchmarks/Crankier/StatusInformation.cs | 32 ++ benchmarks/Crankier/Worker.cs | 173 ++++++++++ .../Crankier/WorkerHeartbeatInformation.cs | 18 ++ benchmarks/Crankier/WorkerReceiver.cs | 78 +++++ benchmarks/Crankier/WorkerSender.cs | 70 ++++ 28 files changed, 1505 insertions(+), 5 deletions(-) create mode 100644 benchmarks/Crankier/Agent.cs create mode 100644 benchmarks/Crankier/AgentHeartbeatInformation.cs create mode 100644 benchmarks/Crankier/AgentReceiver.cs create mode 100644 benchmarks/Crankier/AgentSender.cs create mode 100644 benchmarks/Crankier/AgentWorker.cs create mode 100644 benchmarks/Crankier/Client.cs create mode 100644 benchmarks/Crankier/Commands/AgentCommand.cs create mode 100644 benchmarks/Crankier/Commands/CommandLineUtilities.cs create mode 100644 benchmarks/Crankier/Commands/Defaults.cs create mode 100644 benchmarks/Crankier/Commands/LocalCommand.cs create mode 100644 benchmarks/Crankier/Commands/WorkerCommand.cs create mode 100644 benchmarks/Crankier/ConnectionState.cs create mode 100644 benchmarks/Crankier/Crankier.csproj create mode 100644 benchmarks/Crankier/IAgent.cs create mode 100644 benchmarks/Crankier/IRunner.cs create mode 100644 benchmarks/Crankier/IWorker.cs create mode 100644 benchmarks/Crankier/Message.cs create mode 100644 benchmarks/Crankier/Program.cs create mode 100644 benchmarks/Crankier/Runner.cs create mode 100644 benchmarks/Crankier/StatusInformation.cs create mode 100644 benchmarks/Crankier/Worker.cs create mode 100644 benchmarks/Crankier/WorkerHeartbeatInformation.cs create mode 100644 benchmarks/Crankier/WorkerReceiver.cs create mode 100644 benchmarks/Crankier/WorkerSender.cs diff --git a/SignalR.sln b/SignalR.sln index 78e41b75cd..2b438e1acd 100644 --- a/SignalR.sln +++ b/SignalR.sln @@ -19,8 +19,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{C4BC9889-B49F-41B6-806B-F84941B2549B}" - ProjectSection(SolutionItems) = preProject - EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalRSamples", "samples\SignalRSamples\SignalRSamples.csproj", "{C4AEAB04-F341-4539-B6C0-52368FB4BF9E}" EndProject @@ -87,6 +85,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BenchmarkServer", "benchmar EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Protocols.Json", "src\Microsoft.AspNetCore.SignalR.Protocols.Json\Microsoft.AspNetCore.SignalR.Protocols.Json.csproj", "{896FA5EE-63A5-4EAC-9F09-346584BB4830}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Crankier", "benchmarks\Crankier\Crankier.csproj", "{8D3E3E7D-452B-44F4-86CA-111003EA11ED}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -205,6 +205,10 @@ Global {896FA5EE-63A5-4EAC-9F09-346584BB4830}.Debug|Any CPU.Build.0 = Debug|Any CPU {896FA5EE-63A5-4EAC-9F09-346584BB4830}.Release|Any CPU.ActiveCfg = Release|Any CPU {896FA5EE-63A5-4EAC-9F09-346584BB4830}.Release|Any CPU.Build.0 = Release|Any CPU + {8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -238,6 +242,7 @@ Global {D0C7B22E-B0B6-4D62-BF7D-79EE4AAF1981} = {3A76C5A2-79ED-49BC-8BDC-6A3A766FFA1B} {B5286020-C218-443C-91A9-B65751FB9B29} = {8A4582C8-DC59-4B61-BCE7-119FBAA99EFB} {896FA5EE-63A5-4EAC-9F09-346584BB4830} = {DA69F624-5398-4884-87E4-B816698CDE65} + {8D3E3E7D-452B-44F4-86CA-111003EA11ED} = {8A4582C8-DC59-4B61-BCE7-119FBAA99EFB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7945A4E4-ACDB-4F6E-95CA-6AC6E7C2CD59} diff --git a/benchmarks/BenchmarkServer/Hubs/EchoHub.cs b/benchmarks/BenchmarkServer/Hubs/EchoHub.cs index 6005dd677a..6233650a82 100644 --- a/benchmarks/BenchmarkServer/Hubs/EchoHub.cs +++ b/benchmarks/BenchmarkServer/Hubs/EchoHub.cs @@ -1,7 +1,8 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; @@ -39,5 +40,10 @@ namespace BenchmarkServer.Hubs { return Clients.All.SendAsync("send", time); } + + public void SendPayload(string payload) + { + // Dump the payload, we don't care + } } } diff --git a/benchmarks/BenchmarkServer/Program.cs b/benchmarks/BenchmarkServer/Program.cs index a280edb3b6..0232643b58 100644 --- a/benchmarks/BenchmarkServer/Program.cs +++ b/benchmarks/BenchmarkServer/Program.cs @@ -1,7 +1,8 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -12,6 +13,8 @@ namespace BenchmarkServer { public static void Main(string[] args) { + Console.WriteLine($"Process ID: {Process.GetCurrentProcess().Id}"); + var config = new ConfigurationBuilder() .AddEnvironmentVariables(prefix: "ASPNETCORE_") .AddCommandLine(args) diff --git a/benchmarks/BenchmarkServer/Startup.cs b/benchmarks/BenchmarkServer/Startup.cs index 22a336bf05..8074401d1b 100644 --- a/benchmarks/BenchmarkServer/Startup.cs +++ b/benchmarks/BenchmarkServer/Startup.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using BenchmarkServer.Hubs; diff --git a/benchmarks/Crankier/Agent.cs b/benchmarks/Crankier/Agent.cs new file mode 100644 index 0000000000..e31faa83cb --- /dev/null +++ b/benchmarks/Crankier/Agent.cs @@ -0,0 +1,302 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Agent : IAgent + { + private readonly bool _workerWaitForDebugger; + private readonly string _hostName; + + private readonly ConcurrentDictionary _workers; + private readonly string _executable; + + public Agent(string executable = null, bool workerWaitForDebugger = false) + { + _workerWaitForDebugger = workerWaitForDebugger; + _executable = executable ?? GetMyExecutable(); + + Trace.Listeners.Add(new TextWriterTraceListener(Console.Out)); + + _hostName = Dns.GetHostName(); + + _workers = new ConcurrentDictionary(); + + Trace.WriteLine("Agent created"); + } + + private string GetMyExecutable() + { + var mainModuleFile = Process.GetCurrentProcess().MainModule.FileName; + if (Path.GetFileNameWithoutExtension(mainModuleFile).Equals("dotnet")) + { + // We're running in 'dotnet' + return Path.Combine(AppContext.BaseDirectory, $"{typeof(Program).Assembly.GetName().Name}.dll"); + } + else + { + // Standalone deployment + return mainModuleFile; + } + } + + public IRunner Runner { get; set; } + + public string TargetAddress { get; private set; } + + public int TotalConnectionsRequested { get; private set; } + + public bool ApplyingLoad { get; private set; } + + public AgentHeartbeatInformation GetHeartbeatInformation() + { + return new AgentHeartbeatInformation + { + HostName = _hostName, + TargetAddress = TargetAddress, + TotalConnectionsRequested = TotalConnectionsRequested, + ApplyingLoad = ApplyingLoad, + Workers = _workers.Select(worker => worker.Value.GetHeartbeatInformation()).ToList() + }; + } + + public Dictionary GetWorkerStatus() + { + return _workers.Values.ToDictionary( + k => k.Id, + v => v.StatusInformation); + } + + private AgentWorker CreateWorker() + { + var fileName = _executable; + var arguments = $"worker --agent {Process.GetCurrentProcess().Id}"; + if (_workerWaitForDebugger) + { + arguments += " --wait-for-debugger"; + } + if (fileName.EndsWith(".dll")) + { + // Execute using dotnet.exe + fileName = GetDotNetHost(); + arguments = _executable + " " + arguments; + } + + var startInfo = new ProcessStartInfo() + { + FileName = fileName, + Arguments = arguments, + CreateNoWindow = true, + UseShellExecute = false, + RedirectStandardInput = true, + RedirectStandardOutput = true, + RedirectStandardError = true + }; + + var worker = new AgentWorker(startInfo, this); + + worker.StatusInformation = new StatusInformation(); + + worker.Start(); + + worker.OnError += OnError; + worker.OnExit += OnExit; + + _workers.TryAdd(worker.Id, worker); + + return worker; + } + + private static string GetDotNetHost() => RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? "dotnet.exe" : "dotnet"; + + private async Task StartWorker(int id, string targetAddress, HttpTransportType transportType, int numberOfConnectionsPerWorker) + { + if (_workers.TryGetValue(id, out var worker)) + { + await worker.Worker.ConnectAsync(targetAddress, transportType, numberOfConnectionsPerWorker); + } + } + + public async Task StartWorkersAsync(string targetAddress, int numberOfWorkers, HttpTransportType transportType, int numberOfConnections) + { + TargetAddress = targetAddress; + TotalConnectionsRequested = numberOfConnections; + + var connectionsPerWorker = numberOfConnections / numberOfWorkers; + var remainingConnections = numberOfConnections % numberOfWorkers; + + async Task RunWorker(int index, AgentWorker worker) + { + if (index == 0) + { + await StartWorker(worker.Id, targetAddress, transportType, connectionsPerWorker + remainingConnections); + } + else + { + await StartWorker(worker.Id, targetAddress, transportType, connectionsPerWorker); + } + + await Runner.LogAgentAsync("Agent started listening to worker {0} ({1} of {2}).", worker.Id, index, numberOfWorkers); + } + + var workerTasks = new Task[numberOfWorkers]; + for (var index = 0; index < numberOfWorkers; index++) + { + workerTasks[index] = Task.Run(() => CreateWorker()); + } + + await Task.WhenAll(workerTasks); + + for (var index = 0; index < numberOfWorkers; index++) + { + _ = RunWorker(index, workerTasks[index].Result); + } + } + + public void KillWorker(int workerId) + { + if (_workers.TryGetValue(workerId, out var worker)) + { + worker.Kill(); + Runner.LogAgentAsync("Agent killed Worker {0}.", workerId); + } + } + + public void KillWorkers(int numberOfWorkersToKill) + { + var keys = _workers.Keys.Take(numberOfWorkersToKill).ToList(); + + foreach (var key in keys) + { + if (_workers.TryGetValue(key, out var worker)) + { + worker.Kill(); + Runner.LogAgentAsync("Agent killed Worker {0}.", key); + } + } + } + + public void KillConnections() + { + var keys = _workers.Keys.ToList(); + + foreach (var key in keys) + { + if (_workers.TryGetValue(key, out var worker)) + { + worker.Kill(); + Runner.LogAgentAsync("Agent killed Worker {0}.", key); + } + } + + TotalConnectionsRequested = 0; + ApplyingLoad = false; + } + + public void PingWorker(int workerId, int value) + { + if (_workers.TryGetValue(workerId, out var worker)) + { + worker.Worker.PingAsync(value); + Runner.LogAgentAsync("Agent sent ping command to Worker {0} with value {1}.", workerId, value); + } + else + { + Runner.LogAgentAsync("Agent failed to send ping command, Worker {0} not found.", workerId); + } + } + + public void StartTest(int messageSize, TimeSpan sendInterval) + { + ApplyingLoad = true; + + Task.Run(() => + { + foreach (var worker in _workers.Values) + { + worker.Worker.StartTestAsync(sendInterval, messageSize); + } + }); + } + + public void StopWorker(int workerId) + { + if (_workers.TryGetValue(workerId, out var worker)) + { + worker.Worker.StopAsync(); + } + } + + public async Task StopWorkersAsync() + { + var keys = _workers.Keys.ToList(); + + foreach (var key in keys) + { + if (_workers.TryGetValue(key, out var worker)) + { + await worker.Worker.StopAsync(); + await Runner.LogAgentAsync("Agent stopped Worker {0}.", key); + } + } + TotalConnectionsRequested = 0; + ApplyingLoad = false; + + // Wait for workers to terminate + while (_workers.Count > 0) + { + await Task.Delay(1000); + } + } + + public async Task PongAsync(int id, int value) + { + await Runner.LogAgentAsync("Agent received pong message from Worker {0} with value {1}.", id, value); + await Runner.PongWorkerAsync(id, value); + } + + public async Task LogAsync(int id, string text) + { + await Runner.LogWorkerAsync(id, text); + } + + public Task StatusAsync( + int id, + StatusInformation statusInformation) + { + if (_workers.TryGetValue(id, out var worker)) + { + worker.StatusInformation = statusInformation; + } + + return Task.CompletedTask; + } + + private void OnError(int workerId, Exception ex) + { + Runner.LogWorkerAsync(workerId, ex.Message); + } + + private void OnExit(int workerId, int exitCode) + { + _workers.TryRemove(workerId, out _); + var message = $"Worker {workerId} exited with exit code {exitCode}."; + Trace.WriteLine(message); + if (exitCode != 0) + { + throw new Exception(message); + } + } + } +} diff --git a/benchmarks/Crankier/AgentHeartbeatInformation.cs b/benchmarks/Crankier/AgentHeartbeatInformation.cs new file mode 100644 index 0000000000..5b2ee49df0 --- /dev/null +++ b/benchmarks/Crankier/AgentHeartbeatInformation.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class AgentHeartbeatInformation + { + public string HostName { get; set; } + + public string TargetAddress { get; set; } + + public int TotalConnectionsRequested { get; set; } + + public bool ApplyingLoad { get; set; } + + public List Workers { get; set; } + } +} diff --git a/benchmarks/Crankier/AgentReceiver.cs b/benchmarks/Crankier/AgentReceiver.cs new file mode 100644 index 0000000000..8938060e3f --- /dev/null +++ b/benchmarks/Crankier/AgentReceiver.cs @@ -0,0 +1,63 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading.Tasks; +using Newtonsoft.Json; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class AgentReceiver + { + private readonly StreamReader _reader; + private readonly IAgent _agent; + + public AgentReceiver(StreamReader reader, IAgent agent) + { + _reader = reader; + _agent = agent; + } + + public void Start() + { + Task.Run(async () => + { + var messageString = await _reader.ReadLineAsync(); + while (messageString != null) + { + try + { + var message = JsonConvert.DeserializeObject(messageString); + + switch (message.Command.ToLowerInvariant()) + { + case "pong": + await _agent.PongAsync( + message.Value["Id"].ToObject(), + message.Value["Value"].ToObject()); + break; + case "log": + await _agent.LogAsync( + message.Value["Id"].ToObject(), + message.Value["Text"].ToObject()); + break; + case "status": + await _agent.StatusAsync( + message.Value["Id"].ToObject(), + message.Value["StatusInformation"].ToObject()); + break; + } + } + catch (Exception ex) + { + Trace.WriteLine($"Error parsing '{messageString}': {ex.Message}"); + } + + messageString = await _reader.ReadLineAsync(); + } + }); + } + } +} diff --git a/benchmarks/Crankier/AgentSender.cs b/benchmarks/Crankier/AgentSender.cs new file mode 100644 index 0000000000..8433f60f52 --- /dev/null +++ b/benchmarks/Crankier/AgentSender.cs @@ -0,0 +1,76 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class AgentSender : IAgent + { + private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1); + private readonly StreamWriter _outputStreamWriter; + + public AgentSender(StreamWriter outputStreamWriter) + { + _outputStreamWriter = outputStreamWriter; + } + + public async Task PongAsync(int id, int value) + { + var parameters = new + { + Id = id, + Value = value + }; + + await SendAsync("pong", JToken.FromObject(parameters)); + } + + public async Task LogAsync(int id, string text) + { + var parameters = new + { + Id = id, + Text = text + }; + + await SendAsync("log", JToken.FromObject(parameters)); + } + + public async Task StatusAsync( + int id, + StatusInformation statusInformation) + { + var parameters = new + { + Id = id, + StatusInformation = statusInformation + }; + + await SendAsync("status", JToken.FromObject(parameters)); ; + } + + private async Task SendAsync(string method, JToken parameters) + { + await _lock.WaitAsync(); + try + { + await _outputStreamWriter.WriteLineAsync( + JsonConvert.SerializeObject(new Message + { + Command = method, + Value = parameters + })); + await _outputStreamWriter.FlushAsync(); + } + finally + { + _lock.Release(); + } + } + } +} diff --git a/benchmarks/Crankier/AgentWorker.cs b/benchmarks/Crankier/AgentWorker.cs new file mode 100644 index 0000000000..617fe56a00 --- /dev/null +++ b/benchmarks/Crankier/AgentWorker.cs @@ -0,0 +1,72 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class AgentWorker + { + private readonly Process _workerProcess; + private readonly IAgent _agent; + + public AgentWorker(ProcessStartInfo startInfo, IAgent agent) + { + _workerProcess = new Process(); + _workerProcess.StartInfo = startInfo; + _workerProcess.EnableRaisingEvents = true; + _workerProcess.Exited += OnExited; + _agent = agent; + } + + public int Id { get; private set; } + + public StatusInformation StatusInformation { get; set; } + + public Action OnError; + + public Action OnExit; + + public IWorker Worker { get; private set; } + + public WorkerHeartbeatInformation GetHeartbeatInformation() + { + return new WorkerHeartbeatInformation + { + Id = Id, + ConnectedCount = StatusInformation.ConnectedCount, + DisconnectedCount = StatusInformation.DisconnectedCount, + ReconnectingCount = StatusInformation.ReconnectingCount, + TargetConnectionCount = StatusInformation.TargetConnectionCount + }; + } + + public bool Start() + { + bool success = _workerProcess.Start(); + + if (success) + { + Id = _workerProcess.Id; + + var receiver = new AgentReceiver(_workerProcess.StandardOutput, _agent); + receiver.Start(); + + Worker = new WorkerSender(_workerProcess.StandardInput); + } + + return success; + } + + public void Kill() + { + _workerProcess.Kill(); + } + + private void OnExited(object sender, EventArgs args) + { + OnExit(Id, _workerProcess.ExitCode); + } + } +} diff --git a/benchmarks/Crankier/Client.cs b/benchmarks/Crankier/Client.cs new file mode 100644 index 0000000000..4aec217250 --- /dev/null +++ b/benchmarks/Crankier/Client.cs @@ -0,0 +1,128 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Client + { + private HubConnection _connection; + private CancellationTokenSource _sendCts; + private bool _sendInProgress; + private volatile ConnectionState _connectionState = ConnectionState.Connecting; + + public ConnectionState State => _connectionState; + public async Task CreateAndStartConnectionAsync(string url, HttpTransportType transportType) + { + _connection = new HubConnectionBuilder() + .WithUrl(url, options => options.Transports = transportType) + .Build(); + + _connection.Closed += (ex) => + { + if (ex == null) + { + Trace.WriteLine("Connection terminated"); + _connectionState = ConnectionState.Disconnected; + } + else + { + Trace.WriteLine($"Connection terminated with error: {ex.GetType()}: {ex.Message}"); + _connectionState = ConnectionState.Faulted; + } + }; + + _sendCts = new CancellationTokenSource(); + + await ConnectAsync(); + } + + private async Task ConnectAsync() + { + for (int connectCount = 0; connectCount <= 3; connectCount++) + { + try + { + await _connection.StartAsync(); + _connectionState = ConnectionState.Connected; + break; + } + catch (Exception ex) + { + Trace.WriteLine($"Connection.Start Failed: {ex.GetType()}: {ex.Message}"); + + if (connectCount == 3) + { + _connectionState = ConnectionState.Faulted; + throw; + } + } + + await Task.Delay(1000); + } + } + + public void StartTest(int sendSize, TimeSpan sendInterval) + { + var payload = (sendSize == 0) ? String.Empty : new string('a', sendSize); + + if (_sendInProgress) + { + _sendCts.Cancel(); + _sendCts = new CancellationTokenSource(); + } + else + { + _sendInProgress = true; + } + + if (!String.IsNullOrEmpty(payload)) + { + _ = Task.Run(async () => + { + while (!_sendCts.Token.IsCancellationRequested && State != ConnectionState.Disconnected) + { + try + { + await _connection.InvokeAsync("SendPayload", payload, _sendCts.Token); + } + // REVIEW: This is bad. We need a way to detect a closed connection when an Invocation fails! + catch (InvalidOperationException) + { + // The connection was closed. + Trace.WriteLine("Connection closed"); + break; + } + catch (OperationCanceledException) + { + // The connection was closed. + Trace.WriteLine("Connection closed"); + break; + } + catch (Exception ex) + { + // Connection failed + Trace.WriteLine($"Connection failed: {ex.GetType()}: {ex.Message}"); + throw; + } + + await Task.Delay(sendInterval); + } + }, _sendCts.Token); + } + } + + public Task StopConnectionAsync() + { + _sendCts.Cancel(); + + return _connection.StopAsync(); + } + } +} diff --git a/benchmarks/Crankier/Commands/AgentCommand.cs b/benchmarks/Crankier/Commands/AgentCommand.cs new file mode 100644 index 0000000000..f4629c630a --- /dev/null +++ b/benchmarks/Crankier/Commands/AgentCommand.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.Extensions.CommandLineUtils; + +using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities; + +namespace Microsoft.AspNetCore.SignalR.Crankier.Commands +{ + internal class AgentCommand + { + public static void Register(CommandLineApplication app) + { + app.Command("agent", cmd => cmd.OnExecute(() => Fail("Not yet implemented"))); + } + } +} diff --git a/benchmarks/Crankier/Commands/CommandLineUtilities.cs b/benchmarks/Crankier/Commands/CommandLineUtilities.cs new file mode 100644 index 0000000000..6f07631e63 --- /dev/null +++ b/benchmarks/Crankier/Commands/CommandLineUtilities.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using Microsoft.Extensions.CommandLineUtils; + +namespace Microsoft.AspNetCore.SignalR.Crankier.Commands +{ + internal static class CommandLineUtilities + { + public static int Fail(string message) + { + Error(message); + return 1; + } + + public static void Error(string message) + { + Console.WriteLine($"error: {message}"); + } + + public static int MissingRequiredArg(CommandOption option) + { + return Fail($"Missing required argument: {option.LongName}"); + } + + public static int InvalidArg(CommandOption option) + { + return Fail($"Invalid value '{option.Value()}' for argument: {option.LongName}"); + } + } +} diff --git a/benchmarks/Crankier/Commands/Defaults.cs b/benchmarks/Crankier/Commands/Defaults.cs new file mode 100644 index 0000000000..732ccb6af3 --- /dev/null +++ b/benchmarks/Crankier/Commands/Defaults.cs @@ -0,0 +1,15 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNetCore.Http.Connections; + +namespace Microsoft.AspNetCore.SignalR.Crankier.Commands +{ + internal static class Defaults + { + public static readonly int NumberOfWorkers = 1; + public static readonly int NumberOfConnections = 10_000; + public static readonly int SendDurationInSeconds = 300; + public static readonly HttpTransportType TransportType = HttpTransportType.WebSockets; + } +} diff --git a/benchmarks/Crankier/Commands/LocalCommand.cs b/benchmarks/Crankier/Commands/LocalCommand.cs new file mode 100644 index 0000000000..0d26a1c03b --- /dev/null +++ b/benchmarks/Crankier/Commands/LocalCommand.cs @@ -0,0 +1,79 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; +using Microsoft.Extensions.CommandLineUtils; + +using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities; + +namespace Microsoft.AspNetCore.SignalR.Crankier.Commands +{ + internal class LocalCommand + { + public static void Register(CommandLineApplication app) + { + app.Command("local", cmd => + { + var targetUrlOption = cmd.Option("--target-url ", "The URL to run the test against.", CommandOptionType.SingleValue); + var numberOfWorkersOption = cmd.Option("--workers ", "The number of workers to use.", CommandOptionType.SingleValue); + var numberOfConnectionsOption = cmd.Option("--connections ", "The number of connections per worker to use.", CommandOptionType.SingleValue); + var sendDurationInSecondsOption = cmd.Option("--send-duration ", "The send duration to use.", CommandOptionType.SingleValue); + var transportTypeOption = cmd.Option("--transport ", "The transport to use (defaults to WebSockets).", CommandOptionType.SingleValue); + var workerWaitForDebuggerOption = cmd.Option("--worker-debug", "Provide this switch to have the worker wait for the debugger.", CommandOptionType.NoValue); + + cmd.OnExecute(async () => + { + if (!targetUrlOption.HasValue()) + { + return MissingRequiredArg(targetUrlOption); + } + + var numberOfWorkers = Defaults.NumberOfWorkers; + var numberOfConnections = Defaults.NumberOfConnections; + var sendDurationInSeconds = Defaults.SendDurationInSeconds; + var transportType = Defaults.TransportType; + + if (numberOfWorkersOption.HasValue() && !int.TryParse(numberOfWorkersOption.Value(), out numberOfWorkers)) + { + return MissingRequiredArg(numberOfWorkersOption); + } + + if (numberOfConnectionsOption.HasValue() && !int.TryParse(numberOfConnectionsOption.Value(), out numberOfConnections)) + { + return InvalidArg(numberOfConnectionsOption); + } + + if (sendDurationInSecondsOption.HasValue() && !int.TryParse(sendDurationInSecondsOption.Value(), out sendDurationInSeconds)) + { + return InvalidArg(sendDurationInSecondsOption); + } + + if (transportTypeOption.HasValue() && !Enum.TryParse(transportTypeOption.Value(), out transportType)) + { + return InvalidArg(transportTypeOption); + } + + return await Execute(targetUrlOption.Value(), numberOfWorkers, numberOfConnections, sendDurationInSeconds, transportType, workerWaitForDebuggerOption.HasValue()); + }); + }); + } + + private static async Task Execute(string targetUrl, int numberOfWorkers, int numberOfConnections, int sendDurationInSeconds, HttpTransportType transportType, bool workerWaitForDebugger) + { + var agent = new Agent(workerWaitForDebugger: workerWaitForDebugger); + var runner = new Runner(agent, targetUrl, numberOfWorkers, numberOfConnections, sendDurationInSeconds, transportType); + try + { + await runner.RunAsync(); + } + catch (Exception ex) + { + return Fail(ex.ToString()); + } + + return 0; + } + } +} diff --git a/benchmarks/Crankier/Commands/WorkerCommand.cs b/benchmarks/Crankier/Commands/WorkerCommand.cs new file mode 100644 index 0000000000..c7e1143bf4 --- /dev/null +++ b/benchmarks/Crankier/Commands/WorkerCommand.cs @@ -0,0 +1,60 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.CommandLineUtils; + +using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities; + +namespace Microsoft.AspNetCore.SignalR.Crankier.Commands +{ + internal class WorkerCommand + { + public static void Register(CommandLineApplication app) + { + app.Command("worker", cmd => + { + var agentOption = cmd.Option("--agent ", "The process ID of the agent controlling this worker", CommandOptionType.SingleValue); + var waitForDebuggerOption = cmd.Option("--wait-for-debugger", "Provide this flag to have the worker wait for the debugger.", CommandOptionType.NoValue); + + cmd.OnExecute(async () => + { + if (!agentOption.HasValue()) + { + return MissingRequiredArg(agentOption); + } + + if (!int.TryParse(agentOption.Value(), out var agentPid)) + { + return InvalidArg(agentOption); + } + + if (waitForDebuggerOption.HasValue()) + { + SpinWait.SpinUntil(() => Debugger.IsAttached); + } + + return await Execute(agentPid); + }); + }); + } + + private static async Task Execute(int agentPid) + { + try + { + var worker = new Worker(agentPid); + await worker.RunAsync(); + } + catch (Exception ex) + { + return Fail(ex.ToString()); + } + + return 0; + } + } +} diff --git a/benchmarks/Crankier/ConnectionState.cs b/benchmarks/Crankier/ConnectionState.cs new file mode 100644 index 0000000000..2c7d39bae3 --- /dev/null +++ b/benchmarks/Crankier/ConnectionState.cs @@ -0,0 +1,14 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public enum ConnectionState + { + Connecting, + Connected, + Reconnecting, + Disconnected, + Faulted, + } +} diff --git a/benchmarks/Crankier/Crankier.csproj b/benchmarks/Crankier/Crankier.csproj new file mode 100644 index 0000000000..b4f4eeb920 --- /dev/null +++ b/benchmarks/Crankier/Crankier.csproj @@ -0,0 +1,18 @@ + + + + Exe + netcoreapp2.1 + Microsoft.AspNetCore.SignalR.CranksRevenge + + + + + + + + + + + + diff --git a/benchmarks/Crankier/IAgent.cs b/benchmarks/Crankier/IAgent.cs new file mode 100644 index 0000000000..84e6dffa7d --- /dev/null +++ b/benchmarks/Crankier/IAgent.cs @@ -0,0 +1,14 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public interface IAgent + { + Task PongAsync(int id, int value); + Task LogAsync(int id, string text); + Task StatusAsync(int id, StatusInformation statusInformation); + } +} diff --git a/benchmarks/Crankier/IRunner.cs b/benchmarks/Crankier/IRunner.cs new file mode 100644 index 0000000000..f3f8c2f8b1 --- /dev/null +++ b/benchmarks/Crankier/IRunner.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public interface IRunner + { + Task PongWorkerAsync(int workerId, int value); + + Task LogAgentAsync(string format, params object[] arguments); + + Task LogWorkerAsync(int workerId, string format, params object[] arguments); + } +} diff --git a/benchmarks/Crankier/IWorker.cs b/benchmarks/Crankier/IWorker.cs new file mode 100644 index 0000000000..2940862a0c --- /dev/null +++ b/benchmarks/Crankier/IWorker.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public interface IWorker + { + Task PingAsync(int value); + Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections); + Task StartTestAsync(TimeSpan sendInterval, int sendBytes); + Task StopAsync(); + } +} diff --git a/benchmarks/Crankier/Message.cs b/benchmarks/Crankier/Message.cs new file mode 100644 index 0000000000..e2941a8440 --- /dev/null +++ b/benchmarks/Crankier/Message.cs @@ -0,0 +1,14 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Newtonsoft.Json.Linq; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Message + { + public string Command { get; set; } + + public JToken Value { get; set; } + } +} diff --git a/benchmarks/Crankier/Program.cs b/benchmarks/Crankier/Program.cs new file mode 100644 index 0000000000..a3443ecd94 --- /dev/null +++ b/benchmarks/Crankier/Program.cs @@ -0,0 +1,54 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Linq; +using Microsoft.AspNetCore.SignalR.Crankier.Commands; +using Microsoft.Extensions.CommandLineUtils; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Program + { + public static void Main(string[] args) + { + #if DEBUG + if (args.Any(a => string.Equals(a, "--debug", StringComparison.Ordinal))) + { + args = args.Where(a => !string.Equals(a, "--debug", StringComparison.Ordinal)).ToArray(); + Console.WriteLine($"Waiting for debugger. Process ID: {Process.GetCurrentProcess().Id}"); + Console.WriteLine("Press ENTER to continue"); + Console.ReadLine(); + } + #endif + + var app = new CommandLineApplication(); + app.Description = "Crank's Revenge"; + app.HelpOption("-h|--help"); + + LocalCommand.Register(app); + AgentCommand.Register(app); + WorkerCommand.Register(app); + + app.Command("help", cmd => + { + var commandArgument = cmd.Argument("", "The command to get help for."); + + cmd.OnExecute(() => + { + app.ShowHelp(commandArgument.Value); + return 0; + }); + }); + + app.OnExecute(() => + { + app.ShowHelp(); + return 0; + }); + + app.Execute(args); + } + } +} diff --git a/benchmarks/Crankier/Runner.cs b/benchmarks/Crankier/Runner.cs new file mode 100644 index 0000000000..d1e9667f78 --- /dev/null +++ b/benchmarks/Crankier/Runner.cs @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; +using Newtonsoft.Json; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Runner : IRunner + { + private readonly Agent _agent; + private readonly string _targetUrl; + private readonly int _numberOfWorkers; + private readonly int _numberOfConnections; + private readonly int _sendDurationSeconds; + private readonly HttpTransportType _transportType; + + public Runner(Agent agent, string targetUrl, int numberOfWorkers, int numberOfConnections, int sendDurationInSeconds, HttpTransportType transportType) + { + _agent = agent; + _targetUrl = targetUrl; + _numberOfWorkers = numberOfWorkers; + _numberOfConnections = numberOfConnections; + _sendDurationSeconds = sendDurationInSeconds; + _transportType = transportType; + } + + public async Task RunAsync() + { + _agent.Runner = this; + + await _agent.StartWorkersAsync(_targetUrl, _numberOfWorkers, _transportType, _numberOfConnections); + + // Begin writing worker status information + var writeStatusCts = new CancellationTokenSource(); + var writeStatusTask = WriteConnectionStatusAsync(writeStatusCts.Token); + + // Wait until all connections are connected + while (_agent.GetWorkerStatus().Aggregate(0, (state, status) => state + status.Value.ConnectedCount) < + _agent.TotalConnectionsRequested) + { + await Task.Delay(1000); + } + + // Stay connected for the duration of the send phase + await Task.Delay(TimeSpan.FromSeconds(_sendDurationSeconds)); + + // Disconnect + await _agent.StopWorkersAsync(); + + // Stop writing worker status information + writeStatusCts.Cancel(); + await writeStatusTask; + } + + private Task WriteConnectionStatusAsync(CancellationToken cancellationToken) + { + return Task.Run(async () => + { + var peakConnections = 0; + while (!cancellationToken.IsCancellationRequested) + { + var statusDictionary = _agent.GetWorkerStatus(); + + // Total things up + var status = new StatusInformation(); + foreach (var value in statusDictionary.Values) + { + status = status.Add(value); + } + + peakConnections = Math.Max(peakConnections, status.ConnectedCount); + status.PeakConnections = peakConnections; + + Trace.WriteLine(JsonConvert.SerializeObject(status)); + + await Task.Delay(1000); + } + }); + } + + public Task PongWorkerAsync(int workerId, int value) + { + throw new NotImplementedException(); + } + + public Task LogAgentAsync(string format, params object[] arguments) + { + Trace.WriteLine(string.Format(format, arguments)); + return Task.CompletedTask; + } + + public Task LogWorkerAsync(int workerId, string format, params object[] arguments) + { + Trace.WriteLine(string.Format("({0}) {1}", workerId, string.Format(format, arguments))); + return Task.CompletedTask; + } + } +} diff --git a/benchmarks/Crankier/StatusInformation.cs b/benchmarks/Crankier/StatusInformation.cs new file mode 100644 index 0000000000..1114b12a44 --- /dev/null +++ b/benchmarks/Crankier/StatusInformation.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class StatusInformation + { + public int ConnectingCount { get; set; } + public int ConnectedCount { get; set; } + public int DisconnectedCount { get; set; } + public int ReconnectingCount { get; set; } + public int FaultedCount { get; set; } + public int TargetConnectionCount { get; set; } + + + // Set by agent. + public int PeakConnections { get; set;} + + public StatusInformation Add(StatusInformation value) + { + return new StatusInformation() + { + ConnectingCount = ConnectingCount + value.ConnectingCount, + ConnectedCount = ConnectedCount + value.ConnectedCount, + DisconnectedCount = DisconnectedCount + value.DisconnectedCount, + ReconnectingCount = ReconnectingCount + value.ReconnectingCount, + FaultedCount = FaultedCount + value.FaultedCount, + TargetConnectionCount = TargetConnectionCount + value.TargetConnectionCount, + }; + } + } +} diff --git a/benchmarks/Crankier/Worker.cs b/benchmarks/Crankier/Worker.cs new file mode 100644 index 0000000000..ed41709659 --- /dev/null +++ b/benchmarks/Crankier/Worker.cs @@ -0,0 +1,173 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class Worker : IWorker + { + private readonly Process _agentProcess; + private readonly IAgent _agent; + private readonly int _processId; + private readonly ConcurrentBag _clients; + private readonly CancellationTokenSource _sendStatusCts; + private int _targetConnectionCount; + + public Worker(int agentProcessId) + { + _agentProcess = Process.GetProcessById(agentProcessId); + _agent = new AgentSender(new StreamWriter(Console.OpenStandardOutput())); + _processId = Process.GetCurrentProcess().Id; + _clients = new ConcurrentBag(); + _sendStatusCts = new CancellationTokenSource(); + } + + public async Task RunAsync() + { + _agentProcess.EnableRaisingEvents = true; + _agentProcess.Exited += OnExited; + + Log("Worker created"); + + var receiver = new WorkerReceiver( + new StreamReader(Console.OpenStandardInput()), + this); + + receiver.Start(); + + await SendStatusUpdateAsync(_sendStatusCts.Token); + + receiver.Stop(); + } + + public async Task PingAsync(int value) + { + Log("Worker received ping command with value {0}.", value); + + await _agent.PongAsync(_processId, value); + Log("Worker sent pong command with value {0}.", value); + } + + public async Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections) + { + Log("Worker received connect command with target address {0} and number of connections {1}", targetAddress, numberOfConnections); + + _targetConnectionCount += numberOfConnections; + for (var count = 0; count < numberOfConnections; count++) + { + var client = new Client(); + _clients.Add(client); + + await client.CreateAndStartConnectionAsync(targetAddress, transportType); + } + + Log("Connections connected succesfully"); + } + + public Task StartTestAsync(TimeSpan sendInterval, int sendBytes) + { + Log("Worker received start test command with interval {0} and message size {1}.", sendInterval, sendBytes); + + foreach (var client in _clients) + { + client.StartTest(sendBytes, sendInterval); + } + + Log("Test started succesfully"); + return Task.CompletedTask; + } + + public Task StopAsync() + { + Log("Worker received stop command"); + _targetConnectionCount = 0; + + while (!_clients.IsEmpty) + { + if (_clients.TryTake(out var client)) + { + client.StopConnectionAsync(); + } + } + + _sendStatusCts.Cancel(); + Log("Connections stopped succesfully"); + _targetConnectionCount = 0; + + return Task.CompletedTask; + } + + private void OnExited(object sender, EventArgs args) + { + Environment.Exit(0); + } + + private void Log(string format, params object[] arguments) + { + _agent.LogAsync(_processId, string.Format(format, arguments)); + } + + private async Task SendStatusUpdateAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + var connectedCount = 0; + var connectingCount = 0; + var disconnectedCount = 0; + var reconnectingCount = 0; + var faultedCount = 0; + + foreach (var client in _clients) + { + switch (client.State) + { + case ConnectionState.Connecting: + connectingCount++; + break; + case ConnectionState.Connected: + connectedCount++; + break; + case ConnectionState.Disconnected: + disconnectedCount++; + break; + case ConnectionState.Reconnecting: + reconnectingCount++; + break; + case ConnectionState.Faulted: + faultedCount++; + break; + } + } + + await _agent.StatusAsync( + _processId, + new StatusInformation + { + ConnectingCount = connectingCount, + ConnectedCount = connectedCount, + DisconnectedCount = disconnectedCount, + ReconnectingCount = reconnectingCount, + TargetConnectionCount = _targetConnectionCount, + FaultedCount = faultedCount, + } + ); + + // Sending once per 5 seconds to avoid overloading the Test Controller + try + { + await Task.Delay(5000, cancellationToken); + } + catch (TaskCanceledException) + { + } + } + } + } +} diff --git a/benchmarks/Crankier/WorkerHeartbeatInformation.cs b/benchmarks/Crankier/WorkerHeartbeatInformation.cs new file mode 100644 index 0000000000..fc639d5a5d --- /dev/null +++ b/benchmarks/Crankier/WorkerHeartbeatInformation.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class WorkerHeartbeatInformation + { + public int Id { get; set; } + + public int ConnectedCount { get; set; } + + public int DisconnectedCount { get; set; } + + public int ReconnectingCount { get; set; } + + public int TargetConnectionCount { get; set; } + } +} diff --git a/benchmarks/Crankier/WorkerReceiver.cs b/benchmarks/Crankier/WorkerReceiver.cs new file mode 100644 index 0000000000..18a391d240 --- /dev/null +++ b/benchmarks/Crankier/WorkerReceiver.cs @@ -0,0 +1,78 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; +using Newtonsoft.Json; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class WorkerReceiver + { + private readonly StreamReader _reader; + private readonly IWorker _worker; + private CancellationTokenSource _receiveMessageCts; + + public WorkerReceiver(StreamReader reader, IWorker worker) + { + _reader = reader; + _worker = worker; + } + + public void Start() + { + if (_receiveMessageCts != null) + { + _receiveMessageCts.Cancel(); + } + + _receiveMessageCts = new CancellationTokenSource(); + Task.Run(async () => + { + while (!_receiveMessageCts.Token.IsCancellationRequested) + { + var messageString = await _reader.ReadLineAsync(); + try + { + var message = JsonConvert.DeserializeObject(messageString); + + switch (message.Command.ToLowerInvariant()) + { + case "ping": + await _worker.PingAsync( + message.Value["Value"].ToObject()); + break; + case "connect": + await _worker.ConnectAsync( + message.Value["TargetAddress"].ToObject(), + message.Value["TransportType"].ToObject(), + message.Value["NumberOfConnections"].ToObject()); + break; + case "starttest": + await _worker.StartTestAsync( + TimeSpan.FromMilliseconds(message.Value.Value("SendInterval")), + message.Value["SendBytes"].ToObject()); + break; + case "stop": + await _worker.StopAsync(); + break; + } + } + catch (Exception ex) + { + Trace.WriteLine(ex.Message); + } + } + }); + } + + public void Stop() + { + _receiveMessageCts.Cancel(); + } + } +} diff --git a/benchmarks/Crankier/WorkerSender.cs b/benchmarks/Crankier/WorkerSender.cs new file mode 100644 index 0000000000..17e6f57eda --- /dev/null +++ b/benchmarks/Crankier/WorkerSender.cs @@ -0,0 +1,70 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http.Connections; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.AspNetCore.SignalR.Crankier +{ + public class WorkerSender : IWorker + { + private readonly StreamWriter _outputStreamWriter; + + public WorkerSender(StreamWriter outputStreamWriter) + { + _outputStreamWriter = outputStreamWriter; + } + + public async Task PingAsync(int value) + { + await Send("ping", JToken.FromObject( + new + { + Value = value + })); + } + + public async Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections) + { + await Send("connect", JToken.FromObject( + new + { + TargetAddress = targetAddress, + TransportType = transportType, + NumberOfConnections = numberOfConnections + })); + } + + public async Task StartTestAsync(TimeSpan sendInterval, int sendBytes) + { + var parameters = new + { + SendInterval = sendInterval.TotalMilliseconds, + SendBytes = sendBytes + }; + + await Send("starttest", JToken.FromObject(parameters)); + } + + public async Task StopAsync() + { + await Send("stop", null); + } + + private async Task Send(string method, JToken parameters) + { + await _outputStreamWriter.WriteLineAsync( + JsonConvert.SerializeObject(new Message() + { + Command = method, + Value = parameters + })); + + await _outputStreamWriter.FlushAsync(); + } + } +}