diff --git a/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs b/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs
index 6850989e3d..62ad679754 100644
--- a/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs
+++ b/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs
@@ -47,6 +47,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
var contextOptions = new HubConnectionContextOptions()
{
KeepAliveInterval = TimeSpan.Zero,
+ StreamBufferCapacity = 10,
};
_connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance);
diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs
index 0e98a6ee08..9f94b6a8b0 100644
--- a/src/SignalR/server/Core/src/HubConnectionContext.cs
+++ b/src/SignalR/server/Core/src/HubConnectionContext.cs
@@ -73,6 +73,13 @@ namespace Microsoft.AspNetCore.SignalR
_systemClock = contextOptions.SystemClock ?? new SystemClock();
_lastSendTimeStamp = _systemClock.UtcNowTicks;
+
+ // We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it
+ var maxInvokeLimit = contextOptions.MaximumParallelInvocations;
+ if (maxInvokeLimit != 1)
+ {
+ ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit, maxInvokeLimit);
+ }
}
internal StreamTracker StreamTracker
@@ -93,6 +100,8 @@ namespace Microsoft.AspNetCore.SignalR
internal Exception? CloseException { get; private set; }
+ internal SemaphoreSlim? ActiveInvocationLimit { get; }
+
///
/// Gets a that notifies when the connection is aborted.
///
diff --git a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs
index 4626d195cc..54ada054cf 100644
--- a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs
+++ b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs
@@ -32,5 +32,10 @@ namespace Microsoft.AspNetCore.SignalR
public long? MaximumReceiveMessageSize { get; set; }
internal ISystemClock SystemClock { get; set; } = default!;
+
+ ///
+ /// Gets or sets the maximum parallel hub method invocations.
+ ///
+ public int MaximumParallelInvocations { get; set; } = 1;
}
}
diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs
index 40745494ad..d2d77e0fe6 100644
--- a/src/SignalR/server/Core/src/HubConnectionHandler.cs
+++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs
@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
@@ -31,6 +32,7 @@ namespace Microsoft.AspNetCore.SignalR
private readonly HubDispatcher _dispatcher;
private readonly bool _enableDetailedErrors;
private readonly long? _maximumMessageSize;
+ private readonly int _maxParallelInvokes;
// Internal for testing
internal ISystemClock SystemClock { get; set; } = new SystemClock();
@@ -70,6 +72,7 @@ namespace Microsoft.AspNetCore.SignalR
{
_maximumMessageSize = _hubOptions.MaximumReceiveMessageSize;
_enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
+ _maxParallelInvokes = _hubOptions.MaximumParallelInvocationsPerClient;
if (_hubOptions.HubFilters != null)
{
@@ -80,6 +83,7 @@ namespace Microsoft.AspNetCore.SignalR
{
_maximumMessageSize = _globalHubOptions.MaximumReceiveMessageSize;
_enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
+ _maxParallelInvokes = _globalHubOptions.MaximumParallelInvocationsPerClient;
if (_globalHubOptions.HubFilters != null)
{
@@ -116,6 +120,7 @@ namespace Microsoft.AspNetCore.SignalR
StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
MaximumReceiveMessageSize = _maximumMessageSize,
SystemClock = SystemClock,
+ MaximumParallelInvocations = _maxParallelInvokes,
};
Log.ConnectedStarting(_logger);
@@ -235,7 +240,6 @@ namespace Microsoft.AspNetCore.SignalR
var protocol = connection.Protocol;
connection.BeginClientTimeout();
-
var binder = new HubConnectionBinder(_dispatcher, connection);
while (true)
@@ -258,8 +262,9 @@ namespace Microsoft.AspNetCore.SignalR
{
while (protocol.TryParseMessage(ref buffer, binder, out var message))
{
- messageReceived = true;
connection.StopClientTimeout();
+ // This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
+ messageReceived = true;
await _dispatcher.DispatchMessageAsync(connection, message);
}
@@ -286,9 +291,9 @@ namespace Microsoft.AspNetCore.SignalR
if (protocol.TryParseMessage(ref segment, binder, out var message))
{
- messageReceived = true;
connection.StopClientTimeout();
-
+ // This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
+ messageReceived = true;
await _dispatcher.DispatchMessageAsync(connection, message);
}
else if (overLength)
diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs
index a9a889909d..684b8c343b 100644
--- a/src/SignalR/server/Core/src/HubOptions.cs
+++ b/src/SignalR/server/Core/src/HubOptions.cs
@@ -11,6 +11,8 @@ namespace Microsoft.AspNetCore.SignalR
///
public class HubOptions
{
+ private int _maximumParallelInvocationsPerClient = 1;
+
// HandshakeTimeout and KeepAliveInterval are set to null here to help identify when
// local hub options have been set. Global default values are set in HubOptionsSetup.
// SupportedProtocols being null is the true default value, and it represents support
@@ -53,5 +55,23 @@ namespace Microsoft.AspNetCore.SignalR
public int? StreamBufferCapacity { get; set; } = null;
internal List? HubFilters { get; set; }
+
+ ///
+ /// By default a client is only allowed to invoke a single Hub method at a time.
+ /// Changing this property will allow clients to invoke multiple methods at the same time before queueing.
+ ///
+ public int MaximumParallelInvocationsPerClient
+ {
+ get => _maximumParallelInvocationsPerClient;
+ set
+ {
+ if (value < 1)
+ {
+ throw new ArgumentOutOfRangeException(nameof(MaximumParallelInvocationsPerClient));
+ }
+
+ _maximumParallelInvocationsPerClient = value;
+ }
+ }
}
}
diff --git a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs
index a935980e09..1dfae3de0c 100644
--- a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs
+++ b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs
@@ -25,6 +25,7 @@ namespace Microsoft.AspNetCore.SignalR
options.EnableDetailedErrors = _hubOptions.EnableDetailedErrors;
options.MaximumReceiveMessageSize = _hubOptions.MaximumReceiveMessageSize;
options.StreamBufferCapacity = _hubOptions.StreamBufferCapacity;
+ options.MaximumParallelInvocationsPerClient = _hubOptions.MaximumParallelInvocationsPerClient;
options.UserHasSetValues = true;
diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs
index 359c78f2db..e6710f7e31 100644
--- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs
+++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs
@@ -79,6 +79,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
private static readonly Action _invalidHubParameters =
LoggerMessage.Define(LogLevel.Debug, new EventId(22, "InvalidHubParameters"), "Parameters to hub method '{HubMethod}' are incorrect.");
+ private static readonly Action _invocationIdInUse =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(23, "InvocationIdInUse"), "Invocation ID '{InvocationId}' is already in use.");
+
public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage)
{
_receivedHubInvocation(logger, invocationMessage, null);
@@ -188,6 +191,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
_invalidHubParameters(logger, hubMethod, exception);
}
+
+ public static void InvocationIdInUse(ILogger logger, string InvocationId)
+ {
+ _invocationIdInUse(logger, InvocationId, null);
+ }
}
}
}
diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs
index 889ee5dfc8..ee6be9f57f 100644
--- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs
+++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs
@@ -147,6 +147,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal
// Messages are dispatched sequentially and will stop other messages from being processed until they complete.
// Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run.
+ // With parallel invokes enabled, messages run sequentially until they go async and then the next message will be allowed to start running.
+
switch (hubMessage)
{
case InvocationBindingFailureMessage bindingFailureMessage:
@@ -229,7 +231,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
connection.StreamTracker.TryComplete(message);
// TODO: Send stream completion message to client when we add it
-
return Task.CompletedTask;
}
@@ -258,7 +259,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal
else
{
bool isStreamCall = descriptor.StreamingParameters != null;
- return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall);
+ if (connection.ActiveInvocationLimit != null && !isStreamCall && !isStreamResponse)
+ {
+ return connection.ActiveInvocationLimit.RunAsync(state =>
+ {
+ var (dispatcher, descriptor, connection, invocationMessage) = state;
+ return dispatcher.Invoke(descriptor, connection, invocationMessage, isStreamResponse: false, isStreamCall: false);
+ }, (this, descriptor, connection, hubMethodInvocationMessage));
+ }
+ else
+ {
+ return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall);
+ }
}
}
@@ -305,68 +317,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal
InitializeHub(hub, connection);
Task invocation = null;
- CancellationTokenSource cts = null;
var arguments = hubMethodInvocationMessage.Arguments;
+ CancellationTokenSource cts = null;
if (descriptor.HasSyntheticArguments)
{
- // In order to add the synthetic arguments we need a new array because the invocation array is too small (it doesn't know about synthetic arguments)
- arguments = new object[descriptor.OriginalParameterTypes.Count];
-
- var streamPointer = 0;
- var hubInvocationArgumentPointer = 0;
- for (var parameterPointer = 0; parameterPointer < arguments.Length; parameterPointer++)
- {
- if (hubMethodInvocationMessage.Arguments.Length > hubInvocationArgumentPointer &&
- (hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer] == null ||
- descriptor.OriginalParameterTypes[parameterPointer].IsAssignableFrom(hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer].GetType())))
- {
- // The types match so it isn't a synthetic argument, just copy it into the arguments array
- arguments[parameterPointer] = hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer];
- hubInvocationArgumentPointer++;
- }
- else
- {
- if (descriptor.OriginalParameterTypes[parameterPointer] == typeof(CancellationToken))
- {
- cts = CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
- arguments[parameterPointer] = cts.Token;
- }
- else if (isStreamCall && ReflectionHelper.IsStreamingType(descriptor.OriginalParameterTypes[parameterPointer], mustBeDirectType: true))
- {
- Log.StartingParameterStream(_logger, hubMethodInvocationMessage.StreamIds[streamPointer]);
- var itemType = descriptor.StreamingParameters[streamPointer];
- arguments[parameterPointer] = connection.StreamTracker.AddStream(hubMethodInvocationMessage.StreamIds[streamPointer],
- itemType, descriptor.OriginalParameterTypes[parameterPointer]);
-
- streamPointer++;
- }
- else
- {
- // This should never happen
- Debug.Assert(false, $"Failed to bind argument of type '{descriptor.OriginalParameterTypes[parameterPointer].Name}' for hub method '{methodExecutor.MethodInfo.Name}'.");
- }
- }
- }
+ ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts);
}
if (isStreamResponse)
{
- var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider);
-
- if (result == null)
- {
- Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name);
- await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
- $"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>.");
- return;
- }
-
- cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
- connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts);
- var enumerable = descriptor.FromReturnedStream(result, cts.Token);
-
- Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor);
- _ = StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage);
+ _ = StreamAsync(hubMethodInvocationMessage.InvocationId, connection, arguments, scope, hubActivator, hub, cts, hubMethodInvocationMessage, descriptor);
}
else
{
@@ -456,13 +416,45 @@ namespace Microsoft.AspNetCore.SignalR.Internal
return scope.DisposeAsync();
}
- private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerable