Crank 2: Let's Get Crankier (#1888)

Ports Crank, from https://github.com/SignalR/DCrank to ASP.NET Core SignalR
This commit is contained in:
Andrew Stanton-Nurse 2018-04-09 15:46:26 -07:00 committed by GitHub
parent 01089da84d
commit ecd665c471
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1505 additions and 5 deletions

View File

@ -19,8 +19,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{C4BC9889-B49F-41B6-806B-F84941B2549B}"
ProjectSection(SolutionItems) = preProject
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalRSamples", "samples\SignalRSamples\SignalRSamples.csproj", "{C4AEAB04-F341-4539-B6C0-52368FB4BF9E}"
EndProject
@ -87,6 +85,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BenchmarkServer", "benchmar
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Protocols.Json", "src\Microsoft.AspNetCore.SignalR.Protocols.Json\Microsoft.AspNetCore.SignalR.Protocols.Json.csproj", "{896FA5EE-63A5-4EAC-9F09-346584BB4830}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Crankier", "benchmarks\Crankier\Crankier.csproj", "{8D3E3E7D-452B-44F4-86CA-111003EA11ED}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -205,6 +205,10 @@ Global
{896FA5EE-63A5-4EAC-9F09-346584BB4830}.Debug|Any CPU.Build.0 = Debug|Any CPU
{896FA5EE-63A5-4EAC-9F09-346584BB4830}.Release|Any CPU.ActiveCfg = Release|Any CPU
{896FA5EE-63A5-4EAC-9F09-346584BB4830}.Release|Any CPU.Build.0 = Release|Any CPU
{8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8D3E3E7D-452B-44F4-86CA-111003EA11ED}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -238,6 +242,7 @@ Global
{D0C7B22E-B0B6-4D62-BF7D-79EE4AAF1981} = {3A76C5A2-79ED-49BC-8BDC-6A3A766FFA1B}
{B5286020-C218-443C-91A9-B65751FB9B29} = {8A4582C8-DC59-4B61-BCE7-119FBAA99EFB}
{896FA5EE-63A5-4EAC-9F09-346584BB4830} = {DA69F624-5398-4884-87E4-B816698CDE65}
{8D3E3E7D-452B-44F4-86CA-111003EA11ED} = {8A4582C8-DC59-4B61-BCE7-119FBAA99EFB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7945A4E4-ACDB-4F6E-95CA-6AC6E7C2CD59}

View File

@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
@ -39,5 +40,10 @@ namespace BenchmarkServer.Hubs
{
return Clients.All.SendAsync("send", time);
}
public void SendPayload(string payload)
{
// Dump the payload, we don't care
}
}
}

View File

@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
@ -12,6 +13,8 @@ namespace BenchmarkServer
{
public static void Main(string[] args)
{
Console.WriteLine($"Process ID: {Process.GetCurrentProcess().Id}");
var config = new ConfigurationBuilder()
.AddEnvironmentVariables(prefix: "ASPNETCORE_")
.AddCommandLine(args)

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using BenchmarkServer.Hubs;

View File

@ -0,0 +1,302 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Agent : IAgent
{
private readonly bool _workerWaitForDebugger;
private readonly string _hostName;
private readonly ConcurrentDictionary<int, AgentWorker> _workers;
private readonly string _executable;
public Agent(string executable = null, bool workerWaitForDebugger = false)
{
_workerWaitForDebugger = workerWaitForDebugger;
_executable = executable ?? GetMyExecutable();
Trace.Listeners.Add(new TextWriterTraceListener(Console.Out));
_hostName = Dns.GetHostName();
_workers = new ConcurrentDictionary<int, AgentWorker>();
Trace.WriteLine("Agent created");
}
private string GetMyExecutable()
{
var mainModuleFile = Process.GetCurrentProcess().MainModule.FileName;
if (Path.GetFileNameWithoutExtension(mainModuleFile).Equals("dotnet"))
{
// We're running in 'dotnet'
return Path.Combine(AppContext.BaseDirectory, $"{typeof(Program).Assembly.GetName().Name}.dll");
}
else
{
// Standalone deployment
return mainModuleFile;
}
}
public IRunner Runner { get; set; }
public string TargetAddress { get; private set; }
public int TotalConnectionsRequested { get; private set; }
public bool ApplyingLoad { get; private set; }
public AgentHeartbeatInformation GetHeartbeatInformation()
{
return new AgentHeartbeatInformation
{
HostName = _hostName,
TargetAddress = TargetAddress,
TotalConnectionsRequested = TotalConnectionsRequested,
ApplyingLoad = ApplyingLoad,
Workers = _workers.Select(worker => worker.Value.GetHeartbeatInformation()).ToList()
};
}
public Dictionary<int, StatusInformation> GetWorkerStatus()
{
return _workers.Values.ToDictionary(
k => k.Id,
v => v.StatusInformation);
}
private AgentWorker CreateWorker()
{
var fileName = _executable;
var arguments = $"worker --agent {Process.GetCurrentProcess().Id}";
if (_workerWaitForDebugger)
{
arguments += " --wait-for-debugger";
}
if (fileName.EndsWith(".dll"))
{
// Execute using dotnet.exe
fileName = GetDotNetHost();
arguments = _executable + " " + arguments;
}
var startInfo = new ProcessStartInfo()
{
FileName = fileName,
Arguments = arguments,
CreateNoWindow = true,
UseShellExecute = false,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true
};
var worker = new AgentWorker(startInfo, this);
worker.StatusInformation = new StatusInformation();
worker.Start();
worker.OnError += OnError;
worker.OnExit += OnExit;
_workers.TryAdd(worker.Id, worker);
return worker;
}
private static string GetDotNetHost() => RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? "dotnet.exe" : "dotnet";
private async Task StartWorker(int id, string targetAddress, HttpTransportType transportType, int numberOfConnectionsPerWorker)
{
if (_workers.TryGetValue(id, out var worker))
{
await worker.Worker.ConnectAsync(targetAddress, transportType, numberOfConnectionsPerWorker);
}
}
public async Task StartWorkersAsync(string targetAddress, int numberOfWorkers, HttpTransportType transportType, int numberOfConnections)
{
TargetAddress = targetAddress;
TotalConnectionsRequested = numberOfConnections;
var connectionsPerWorker = numberOfConnections / numberOfWorkers;
var remainingConnections = numberOfConnections % numberOfWorkers;
async Task RunWorker(int index, AgentWorker worker)
{
if (index == 0)
{
await StartWorker(worker.Id, targetAddress, transportType, connectionsPerWorker + remainingConnections);
}
else
{
await StartWorker(worker.Id, targetAddress, transportType, connectionsPerWorker);
}
await Runner.LogAgentAsync("Agent started listening to worker {0} ({1} of {2}).", worker.Id, index, numberOfWorkers);
}
var workerTasks = new Task<AgentWorker>[numberOfWorkers];
for (var index = 0; index < numberOfWorkers; index++)
{
workerTasks[index] = Task.Run(() => CreateWorker());
}
await Task.WhenAll(workerTasks);
for (var index = 0; index < numberOfWorkers; index++)
{
_ = RunWorker(index, workerTasks[index].Result);
}
}
public void KillWorker(int workerId)
{
if (_workers.TryGetValue(workerId, out var worker))
{
worker.Kill();
Runner.LogAgentAsync("Agent killed Worker {0}.", workerId);
}
}
public void KillWorkers(int numberOfWorkersToKill)
{
var keys = _workers.Keys.Take(numberOfWorkersToKill).ToList();
foreach (var key in keys)
{
if (_workers.TryGetValue(key, out var worker))
{
worker.Kill();
Runner.LogAgentAsync("Agent killed Worker {0}.", key);
}
}
}
public void KillConnections()
{
var keys = _workers.Keys.ToList();
foreach (var key in keys)
{
if (_workers.TryGetValue(key, out var worker))
{
worker.Kill();
Runner.LogAgentAsync("Agent killed Worker {0}.", key);
}
}
TotalConnectionsRequested = 0;
ApplyingLoad = false;
}
public void PingWorker(int workerId, int value)
{
if (_workers.TryGetValue(workerId, out var worker))
{
worker.Worker.PingAsync(value);
Runner.LogAgentAsync("Agent sent ping command to Worker {0} with value {1}.", workerId, value);
}
else
{
Runner.LogAgentAsync("Agent failed to send ping command, Worker {0} not found.", workerId);
}
}
public void StartTest(int messageSize, TimeSpan sendInterval)
{
ApplyingLoad = true;
Task.Run(() =>
{
foreach (var worker in _workers.Values)
{
worker.Worker.StartTestAsync(sendInterval, messageSize);
}
});
}
public void StopWorker(int workerId)
{
if (_workers.TryGetValue(workerId, out var worker))
{
worker.Worker.StopAsync();
}
}
public async Task StopWorkersAsync()
{
var keys = _workers.Keys.ToList();
foreach (var key in keys)
{
if (_workers.TryGetValue(key, out var worker))
{
await worker.Worker.StopAsync();
await Runner.LogAgentAsync("Agent stopped Worker {0}.", key);
}
}
TotalConnectionsRequested = 0;
ApplyingLoad = false;
// Wait for workers to terminate
while (_workers.Count > 0)
{
await Task.Delay(1000);
}
}
public async Task PongAsync(int id, int value)
{
await Runner.LogAgentAsync("Agent received pong message from Worker {0} with value {1}.", id, value);
await Runner.PongWorkerAsync(id, value);
}
public async Task LogAsync(int id, string text)
{
await Runner.LogWorkerAsync(id, text);
}
public Task StatusAsync(
int id,
StatusInformation statusInformation)
{
if (_workers.TryGetValue(id, out var worker))
{
worker.StatusInformation = statusInformation;
}
return Task.CompletedTask;
}
private void OnError(int workerId, Exception ex)
{
Runner.LogWorkerAsync(workerId, ex.Message);
}
private void OnExit(int workerId, int exitCode)
{
_workers.TryRemove(workerId, out _);
var message = $"Worker {workerId} exited with exit code {exitCode}.";
Trace.WriteLine(message);
if (exitCode != 0)
{
throw new Exception(message);
}
}
}
}

View File

@ -0,0 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class AgentHeartbeatInformation
{
public string HostName { get; set; }
public string TargetAddress { get; set; }
public int TotalConnectionsRequested { get; set; }
public bool ApplyingLoad { get; set; }
public List<WorkerHeartbeatInformation> Workers { get; set; }
}
}

View File

@ -0,0 +1,63 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class AgentReceiver
{
private readonly StreamReader _reader;
private readonly IAgent _agent;
public AgentReceiver(StreamReader reader, IAgent agent)
{
_reader = reader;
_agent = agent;
}
public void Start()
{
Task.Run(async () =>
{
var messageString = await _reader.ReadLineAsync();
while (messageString != null)
{
try
{
var message = JsonConvert.DeserializeObject<Message>(messageString);
switch (message.Command.ToLowerInvariant())
{
case "pong":
await _agent.PongAsync(
message.Value["Id"].ToObject<int>(),
message.Value["Value"].ToObject<int>());
break;
case "log":
await _agent.LogAsync(
message.Value["Id"].ToObject<int>(),
message.Value["Text"].ToObject<string>());
break;
case "status":
await _agent.StatusAsync(
message.Value["Id"].ToObject<int>(),
message.Value["StatusInformation"].ToObject<StatusInformation>());
break;
}
}
catch (Exception ex)
{
Trace.WriteLine($"Error parsing '{messageString}': {ex.Message}");
}
messageString = await _reader.ReadLineAsync();
}
});
}
}
}

View File

@ -0,0 +1,76 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class AgentSender : IAgent
{
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private readonly StreamWriter _outputStreamWriter;
public AgentSender(StreamWriter outputStreamWriter)
{
_outputStreamWriter = outputStreamWriter;
}
public async Task PongAsync(int id, int value)
{
var parameters = new
{
Id = id,
Value = value
};
await SendAsync("pong", JToken.FromObject(parameters));
}
public async Task LogAsync(int id, string text)
{
var parameters = new
{
Id = id,
Text = text
};
await SendAsync("log", JToken.FromObject(parameters));
}
public async Task StatusAsync(
int id,
StatusInformation statusInformation)
{
var parameters = new
{
Id = id,
StatusInformation = statusInformation
};
await SendAsync("status", JToken.FromObject(parameters)); ;
}
private async Task SendAsync(string method, JToken parameters)
{
await _lock.WaitAsync();
try
{
await _outputStreamWriter.WriteLineAsync(
JsonConvert.SerializeObject(new Message
{
Command = method,
Value = parameters
}));
await _outputStreamWriter.FlushAsync();
}
finally
{
_lock.Release();
}
}
}
}

View File

@ -0,0 +1,72 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class AgentWorker
{
private readonly Process _workerProcess;
private readonly IAgent _agent;
public AgentWorker(ProcessStartInfo startInfo, IAgent agent)
{
_workerProcess = new Process();
_workerProcess.StartInfo = startInfo;
_workerProcess.EnableRaisingEvents = true;
_workerProcess.Exited += OnExited;
_agent = agent;
}
public int Id { get; private set; }
public StatusInformation StatusInformation { get; set; }
public Action<int, Exception> OnError;
public Action<int, int> OnExit;
public IWorker Worker { get; private set; }
public WorkerHeartbeatInformation GetHeartbeatInformation()
{
return new WorkerHeartbeatInformation
{
Id = Id,
ConnectedCount = StatusInformation.ConnectedCount,
DisconnectedCount = StatusInformation.DisconnectedCount,
ReconnectingCount = StatusInformation.ReconnectingCount,
TargetConnectionCount = StatusInformation.TargetConnectionCount
};
}
public bool Start()
{
bool success = _workerProcess.Start();
if (success)
{
Id = _workerProcess.Id;
var receiver = new AgentReceiver(_workerProcess.StandardOutput, _agent);
receiver.Start();
Worker = new WorkerSender(_workerProcess.StandardInput);
}
return success;
}
public void Kill()
{
_workerProcess.Kill();
}
private void OnExited(object sender, EventArgs args)
{
OnExit(Id, _workerProcess.ExitCode);
}
}
}

View File

@ -0,0 +1,128 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Client
{
private HubConnection _connection;
private CancellationTokenSource _sendCts;
private bool _sendInProgress;
private volatile ConnectionState _connectionState = ConnectionState.Connecting;
public ConnectionState State => _connectionState;
public async Task CreateAndStartConnectionAsync(string url, HttpTransportType transportType)
{
_connection = new HubConnectionBuilder()
.WithUrl(url, options => options.Transports = transportType)
.Build();
_connection.Closed += (ex) =>
{
if (ex == null)
{
Trace.WriteLine("Connection terminated");
_connectionState = ConnectionState.Disconnected;
}
else
{
Trace.WriteLine($"Connection terminated with error: {ex.GetType()}: {ex.Message}");
_connectionState = ConnectionState.Faulted;
}
};
_sendCts = new CancellationTokenSource();
await ConnectAsync();
}
private async Task ConnectAsync()
{
for (int connectCount = 0; connectCount <= 3; connectCount++)
{
try
{
await _connection.StartAsync();
_connectionState = ConnectionState.Connected;
break;
}
catch (Exception ex)
{
Trace.WriteLine($"Connection.Start Failed: {ex.GetType()}: {ex.Message}");
if (connectCount == 3)
{
_connectionState = ConnectionState.Faulted;
throw;
}
}
await Task.Delay(1000);
}
}
public void StartTest(int sendSize, TimeSpan sendInterval)
{
var payload = (sendSize == 0) ? String.Empty : new string('a', sendSize);
if (_sendInProgress)
{
_sendCts.Cancel();
_sendCts = new CancellationTokenSource();
}
else
{
_sendInProgress = true;
}
if (!String.IsNullOrEmpty(payload))
{
_ = Task.Run(async () =>
{
while (!_sendCts.Token.IsCancellationRequested && State != ConnectionState.Disconnected)
{
try
{
await _connection.InvokeAsync("SendPayload", payload, _sendCts.Token);
}
// REVIEW: This is bad. We need a way to detect a closed connection when an Invocation fails!
catch (InvalidOperationException)
{
// The connection was closed.
Trace.WriteLine("Connection closed");
break;
}
catch (OperationCanceledException)
{
// The connection was closed.
Trace.WriteLine("Connection closed");
break;
}
catch (Exception ex)
{
// Connection failed
Trace.WriteLine($"Connection failed: {ex.GetType()}: {ex.Message}");
throw;
}
await Task.Delay(sendInterval);
}
}, _sendCts.Token);
}
}
public Task StopConnectionAsync()
{
_sendCts.Cancel();
return _connection.StopAsync();
}
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.Extensions.CommandLineUtils;
using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities;
namespace Microsoft.AspNetCore.SignalR.Crankier.Commands
{
internal class AgentCommand
{
public static void Register(CommandLineApplication app)
{
app.Command("agent", cmd => cmd.OnExecute(() => Fail("Not yet implemented")));
}
}
}

View File

@ -0,0 +1,32 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.Extensions.CommandLineUtils;
namespace Microsoft.AspNetCore.SignalR.Crankier.Commands
{
internal static class CommandLineUtilities
{
public static int Fail(string message)
{
Error(message);
return 1;
}
public static void Error(string message)
{
Console.WriteLine($"error: {message}");
}
public static int MissingRequiredArg(CommandOption option)
{
return Fail($"Missing required argument: {option.LongName}");
}
public static int InvalidArg(CommandOption option)
{
return Fail($"Invalid value '{option.Value()}' for argument: {option.LongName}");
}
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Http.Connections;
namespace Microsoft.AspNetCore.SignalR.Crankier.Commands
{
internal static class Defaults
{
public static readonly int NumberOfWorkers = 1;
public static readonly int NumberOfConnections = 10_000;
public static readonly int SendDurationInSeconds = 300;
public static readonly HttpTransportType TransportType = HttpTransportType.WebSockets;
}
}

View File

@ -0,0 +1,79 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.Extensions.CommandLineUtils;
using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities;
namespace Microsoft.AspNetCore.SignalR.Crankier.Commands
{
internal class LocalCommand
{
public static void Register(CommandLineApplication app)
{
app.Command("local", cmd =>
{
var targetUrlOption = cmd.Option("--target-url <TARGET_URL>", "The URL to run the test against.", CommandOptionType.SingleValue);
var numberOfWorkersOption = cmd.Option("--workers <WORKER_COUNT>", "The number of workers to use.", CommandOptionType.SingleValue);
var numberOfConnectionsOption = cmd.Option("--connections <CONNECTION_COUNT>", "The number of connections per worker to use.", CommandOptionType.SingleValue);
var sendDurationInSecondsOption = cmd.Option("--send-duration <SEND_DURATION_IN_SECONDS>", "The send duration to use.", CommandOptionType.SingleValue);
var transportTypeOption = cmd.Option("--transport <TRANSPORT>", "The transport to use (defaults to WebSockets).", CommandOptionType.SingleValue);
var workerWaitForDebuggerOption = cmd.Option("--worker-debug", "Provide this switch to have the worker wait for the debugger.", CommandOptionType.NoValue);
cmd.OnExecute(async () =>
{
if (!targetUrlOption.HasValue())
{
return MissingRequiredArg(targetUrlOption);
}
var numberOfWorkers = Defaults.NumberOfWorkers;
var numberOfConnections = Defaults.NumberOfConnections;
var sendDurationInSeconds = Defaults.SendDurationInSeconds;
var transportType = Defaults.TransportType;
if (numberOfWorkersOption.HasValue() && !int.TryParse(numberOfWorkersOption.Value(), out numberOfWorkers))
{
return MissingRequiredArg(numberOfWorkersOption);
}
if (numberOfConnectionsOption.HasValue() && !int.TryParse(numberOfConnectionsOption.Value(), out numberOfConnections))
{
return InvalidArg(numberOfConnectionsOption);
}
if (sendDurationInSecondsOption.HasValue() && !int.TryParse(sendDurationInSecondsOption.Value(), out sendDurationInSeconds))
{
return InvalidArg(sendDurationInSecondsOption);
}
if (transportTypeOption.HasValue() && !Enum.TryParse(transportTypeOption.Value(), out transportType))
{
return InvalidArg(transportTypeOption);
}
return await Execute(targetUrlOption.Value(), numberOfWorkers, numberOfConnections, sendDurationInSeconds, transportType, workerWaitForDebuggerOption.HasValue());
});
});
}
private static async Task<int> Execute(string targetUrl, int numberOfWorkers, int numberOfConnections, int sendDurationInSeconds, HttpTransportType transportType, bool workerWaitForDebugger)
{
var agent = new Agent(workerWaitForDebugger: workerWaitForDebugger);
var runner = new Runner(agent, targetUrl, numberOfWorkers, numberOfConnections, sendDurationInSeconds, transportType);
try
{
await runner.RunAsync();
}
catch (Exception ex)
{
return Fail(ex.ToString());
}
return 0;
}
}
}

View File

@ -0,0 +1,60 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.CommandLineUtils;
using static Microsoft.AspNetCore.SignalR.Crankier.Commands.CommandLineUtilities;
namespace Microsoft.AspNetCore.SignalR.Crankier.Commands
{
internal class WorkerCommand
{
public static void Register(CommandLineApplication app)
{
app.Command("worker", cmd =>
{
var agentOption = cmd.Option("--agent <PARENT_PID>", "The process ID of the agent controlling this worker", CommandOptionType.SingleValue);
var waitForDebuggerOption = cmd.Option("--wait-for-debugger", "Provide this flag to have the worker wait for the debugger.", CommandOptionType.NoValue);
cmd.OnExecute(async () =>
{
if (!agentOption.HasValue())
{
return MissingRequiredArg(agentOption);
}
if (!int.TryParse(agentOption.Value(), out var agentPid))
{
return InvalidArg(agentOption);
}
if (waitForDebuggerOption.HasValue())
{
SpinWait.SpinUntil(() => Debugger.IsAttached);
}
return await Execute(agentPid);
});
});
}
private static async Task<int> Execute(int agentPid)
{
try
{
var worker = new Worker(agentPid);
await worker.RunAsync();
}
catch (Exception ex)
{
return Fail(ex.ToString());
}
return 0;
}
}
}

View File

@ -0,0 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public enum ConnectionState
{
Connecting,
Connected,
Reconnecting,
Disconnected,
Faulted,
}
}

View File

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>Microsoft.AspNetCore.SignalR.CranksRevenge</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.CommandLineUtils.Sources" Version="$(MicrosoftExtensionsCommandLineUtilsSourcesPackageVersion)" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public interface IAgent
{
Task PongAsync(int id, int value);
Task LogAsync(int id, string text);
Task StatusAsync(int id, StatusInformation statusInformation);
}
}

View File

@ -0,0 +1,16 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public interface IRunner
{
Task PongWorkerAsync(int workerId, int value);
Task LogAgentAsync(string format, params object[] arguments);
Task LogWorkerAsync(int workerId, string format, params object[] arguments);
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public interface IWorker
{
Task PingAsync(int value);
Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections);
Task StartTestAsync(TimeSpan sendInterval, int sendBytes);
Task StopAsync();
}
}

View File

@ -0,0 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Newtonsoft.Json.Linq;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Message
{
public string Command { get; set; }
public JToken Value { get; set; }
}
}

View File

@ -0,0 +1,54 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Linq;
using Microsoft.AspNetCore.SignalR.Crankier.Commands;
using Microsoft.Extensions.CommandLineUtils;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Program
{
public static void Main(string[] args)
{
#if DEBUG
if (args.Any(a => string.Equals(a, "--debug", StringComparison.Ordinal)))
{
args = args.Where(a => !string.Equals(a, "--debug", StringComparison.Ordinal)).ToArray();
Console.WriteLine($"Waiting for debugger. Process ID: {Process.GetCurrentProcess().Id}");
Console.WriteLine("Press ENTER to continue");
Console.ReadLine();
}
#endif
var app = new CommandLineApplication();
app.Description = "Crank's Revenge";
app.HelpOption("-h|--help");
LocalCommand.Register(app);
AgentCommand.Register(app);
WorkerCommand.Register(app);
app.Command("help", cmd =>
{
var commandArgument = cmd.Argument("<COMMAND>", "The command to get help for.");
cmd.OnExecute(() =>
{
app.ShowHelp(commandArgument.Value);
return 0;
});
});
app.OnExecute(() =>
{
app.ShowHelp();
return 0;
});
app.Execute(args);
}
}
}

View File

@ -0,0 +1,104 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Newtonsoft.Json;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Runner : IRunner
{
private readonly Agent _agent;
private readonly string _targetUrl;
private readonly int _numberOfWorkers;
private readonly int _numberOfConnections;
private readonly int _sendDurationSeconds;
private readonly HttpTransportType _transportType;
public Runner(Agent agent, string targetUrl, int numberOfWorkers, int numberOfConnections, int sendDurationInSeconds, HttpTransportType transportType)
{
_agent = agent;
_targetUrl = targetUrl;
_numberOfWorkers = numberOfWorkers;
_numberOfConnections = numberOfConnections;
_sendDurationSeconds = sendDurationInSeconds;
_transportType = transportType;
}
public async Task RunAsync()
{
_agent.Runner = this;
await _agent.StartWorkersAsync(_targetUrl, _numberOfWorkers, _transportType, _numberOfConnections);
// Begin writing worker status information
var writeStatusCts = new CancellationTokenSource();
var writeStatusTask = WriteConnectionStatusAsync(writeStatusCts.Token);
// Wait until all connections are connected
while (_agent.GetWorkerStatus().Aggregate(0, (state, status) => state + status.Value.ConnectedCount) <
_agent.TotalConnectionsRequested)
{
await Task.Delay(1000);
}
// Stay connected for the duration of the send phase
await Task.Delay(TimeSpan.FromSeconds(_sendDurationSeconds));
// Disconnect
await _agent.StopWorkersAsync();
// Stop writing worker status information
writeStatusCts.Cancel();
await writeStatusTask;
}
private Task WriteConnectionStatusAsync(CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
var peakConnections = 0;
while (!cancellationToken.IsCancellationRequested)
{
var statusDictionary = _agent.GetWorkerStatus();
// Total things up
var status = new StatusInformation();
foreach (var value in statusDictionary.Values)
{
status = status.Add(value);
}
peakConnections = Math.Max(peakConnections, status.ConnectedCount);
status.PeakConnections = peakConnections;
Trace.WriteLine(JsonConvert.SerializeObject(status));
await Task.Delay(1000);
}
});
}
public Task PongWorkerAsync(int workerId, int value)
{
throw new NotImplementedException();
}
public Task LogAgentAsync(string format, params object[] arguments)
{
Trace.WriteLine(string.Format(format, arguments));
return Task.CompletedTask;
}
public Task LogWorkerAsync(int workerId, string format, params object[] arguments)
{
Trace.WriteLine(string.Format("({0}) {1}", workerId, string.Format(format, arguments)));
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,32 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class StatusInformation
{
public int ConnectingCount { get; set; }
public int ConnectedCount { get; set; }
public int DisconnectedCount { get; set; }
public int ReconnectingCount { get; set; }
public int FaultedCount { get; set; }
public int TargetConnectionCount { get; set; }
// Set by agent.
public int PeakConnections { get; set;}
public StatusInformation Add(StatusInformation value)
{
return new StatusInformation()
{
ConnectingCount = ConnectingCount + value.ConnectingCount,
ConnectedCount = ConnectedCount + value.ConnectedCount,
DisconnectedCount = DisconnectedCount + value.DisconnectedCount,
ReconnectingCount = ReconnectingCount + value.ReconnectingCount,
FaultedCount = FaultedCount + value.FaultedCount,
TargetConnectionCount = TargetConnectionCount + value.TargetConnectionCount,
};
}
}
}

View File

@ -0,0 +1,173 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class Worker : IWorker
{
private readonly Process _agentProcess;
private readonly IAgent _agent;
private readonly int _processId;
private readonly ConcurrentBag<Client> _clients;
private readonly CancellationTokenSource _sendStatusCts;
private int _targetConnectionCount;
public Worker(int agentProcessId)
{
_agentProcess = Process.GetProcessById(agentProcessId);
_agent = new AgentSender(new StreamWriter(Console.OpenStandardOutput()));
_processId = Process.GetCurrentProcess().Id;
_clients = new ConcurrentBag<Client>();
_sendStatusCts = new CancellationTokenSource();
}
public async Task RunAsync()
{
_agentProcess.EnableRaisingEvents = true;
_agentProcess.Exited += OnExited;
Log("Worker created");
var receiver = new WorkerReceiver(
new StreamReader(Console.OpenStandardInput()),
this);
receiver.Start();
await SendStatusUpdateAsync(_sendStatusCts.Token);
receiver.Stop();
}
public async Task PingAsync(int value)
{
Log("Worker received ping command with value {0}.", value);
await _agent.PongAsync(_processId, value);
Log("Worker sent pong command with value {0}.", value);
}
public async Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections)
{
Log("Worker received connect command with target address {0} and number of connections {1}", targetAddress, numberOfConnections);
_targetConnectionCount += numberOfConnections;
for (var count = 0; count < numberOfConnections; count++)
{
var client = new Client();
_clients.Add(client);
await client.CreateAndStartConnectionAsync(targetAddress, transportType);
}
Log("Connections connected succesfully");
}
public Task StartTestAsync(TimeSpan sendInterval, int sendBytes)
{
Log("Worker received start test command with interval {0} and message size {1}.", sendInterval, sendBytes);
foreach (var client in _clients)
{
client.StartTest(sendBytes, sendInterval);
}
Log("Test started succesfully");
return Task.CompletedTask;
}
public Task StopAsync()
{
Log("Worker received stop command");
_targetConnectionCount = 0;
while (!_clients.IsEmpty)
{
if (_clients.TryTake(out var client))
{
client.StopConnectionAsync();
}
}
_sendStatusCts.Cancel();
Log("Connections stopped succesfully");
_targetConnectionCount = 0;
return Task.CompletedTask;
}
private void OnExited(object sender, EventArgs args)
{
Environment.Exit(0);
}
private void Log(string format, params object[] arguments)
{
_agent.LogAsync(_processId, string.Format(format, arguments));
}
private async Task SendStatusUpdateAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var connectedCount = 0;
var connectingCount = 0;
var disconnectedCount = 0;
var reconnectingCount = 0;
var faultedCount = 0;
foreach (var client in _clients)
{
switch (client.State)
{
case ConnectionState.Connecting:
connectingCount++;
break;
case ConnectionState.Connected:
connectedCount++;
break;
case ConnectionState.Disconnected:
disconnectedCount++;
break;
case ConnectionState.Reconnecting:
reconnectingCount++;
break;
case ConnectionState.Faulted:
faultedCount++;
break;
}
}
await _agent.StatusAsync(
_processId,
new StatusInformation
{
ConnectingCount = connectingCount,
ConnectedCount = connectedCount,
DisconnectedCount = disconnectedCount,
ReconnectingCount = reconnectingCount,
TargetConnectionCount = _targetConnectionCount,
FaultedCount = faultedCount,
}
);
// Sending once per 5 seconds to avoid overloading the Test Controller
try
{
await Task.Delay(5000, cancellationToken);
}
catch (TaskCanceledException)
{
}
}
}
}
}

View File

@ -0,0 +1,18 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class WorkerHeartbeatInformation
{
public int Id { get; set; }
public int ConnectedCount { get; set; }
public int DisconnectedCount { get; set; }
public int ReconnectingCount { get; set; }
public int TargetConnectionCount { get; set; }
}
}

View File

@ -0,0 +1,78 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Newtonsoft.Json;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class WorkerReceiver
{
private readonly StreamReader _reader;
private readonly IWorker _worker;
private CancellationTokenSource _receiveMessageCts;
public WorkerReceiver(StreamReader reader, IWorker worker)
{
_reader = reader;
_worker = worker;
}
public void Start()
{
if (_receiveMessageCts != null)
{
_receiveMessageCts.Cancel();
}
_receiveMessageCts = new CancellationTokenSource();
Task.Run(async () =>
{
while (!_receiveMessageCts.Token.IsCancellationRequested)
{
var messageString = await _reader.ReadLineAsync();
try
{
var message = JsonConvert.DeserializeObject<Message>(messageString);
switch (message.Command.ToLowerInvariant())
{
case "ping":
await _worker.PingAsync(
message.Value["Value"].ToObject<int>());
break;
case "connect":
await _worker.ConnectAsync(
message.Value["TargetAddress"].ToObject<string>(),
message.Value["TransportType"].ToObject<HttpTransportType>(),
message.Value["NumberOfConnections"].ToObject<int>());
break;
case "starttest":
await _worker.StartTestAsync(
TimeSpan.FromMilliseconds(message.Value.Value<double>("SendInterval")),
message.Value["SendBytes"].ToObject<int>());
break;
case "stop":
await _worker.StopAsync();
break;
}
}
catch (Exception ex)
{
Trace.WriteLine(ex.Message);
}
}
});
}
public void Stop()
{
_receiveMessageCts.Cancel();
}
}
}

View File

@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Microsoft.AspNetCore.SignalR.Crankier
{
public class WorkerSender : IWorker
{
private readonly StreamWriter _outputStreamWriter;
public WorkerSender(StreamWriter outputStreamWriter)
{
_outputStreamWriter = outputStreamWriter;
}
public async Task PingAsync(int value)
{
await Send("ping", JToken.FromObject(
new
{
Value = value
}));
}
public async Task ConnectAsync(string targetAddress, HttpTransportType transportType, int numberOfConnections)
{
await Send("connect", JToken.FromObject(
new
{
TargetAddress = targetAddress,
TransportType = transportType,
NumberOfConnections = numberOfConnections
}));
}
public async Task StartTestAsync(TimeSpan sendInterval, int sendBytes)
{
var parameters = new
{
SendInterval = sendInterval.TotalMilliseconds,
SendBytes = sendBytes
};
await Send("starttest", JToken.FromObject(parameters));
}
public async Task StopAsync()
{
await Send("stop", null);
}
private async Task Send(string method, JToken parameters)
{
await _outputStreamWriter.WriteLineAsync(
JsonConvert.SerializeObject(new Message()
{
Command = method,
Value = parameters
}));
await _outputStreamWriter.FlushAsync();
}
}
}