From a26c4e936b1a775b890a8fb2d33fdefcece60518 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Tue, 7 May 2019 17:13:07 -0700 Subject: [PATCH] Make stream buffer capacity configurable (#9877) --- .../Client/test/FunctionalTests/Hubs.cs | 3 +- .../Tests.Utils/HubConnectionContextUtils.cs | 6 ++-- ...t.AspNetCore.SignalR.Core.netcoreapp3.0.cs | 2 ++ .../server/Core/src/HubConnectionContext.cs | 33 ++++++++++++++----- .../server/Core/src/HubConnectionHandler.cs | 3 +- src/SignalR/server/Core/src/HubOptions.cs | 11 +++++-- .../Core/src/Internal/HubOptionsSetup.cs | 7 ++++ src/SignalR/server/Core/src/StreamTracker.cs | 18 ++++++---- .../server/SignalR/test/AddSignalRTests.cs | 14 ++++++++ .../HubConnectionHandlerTestUtils/Hubs.cs | 8 +++++ .../SignalR/test/HubConnectionHandlerTests.cs | 31 +++++++++++++++++ 11 files changed, 112 insertions(+), 24 deletions(-) diff --git a/src/SignalR/clients/csharp/Client/test/FunctionalTests/Hubs.cs b/src/SignalR/clients/csharp/Client/test/FunctionalTests/Hubs.cs index 6843a4cb64..eb3a08fe2c 100644 --- a/src/SignalR/clients/csharp/Client/test/FunctionalTests/Hubs.cs +++ b/src/SignalR/clients/csharp/Client/test/FunctionalTests/Hubs.cs @@ -205,7 +205,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests public static ChannelReader StreamEcho(ChannelReader source) { var output = Channel.CreateUnbounded(); - _ = Task.Run(async () => { + _ = Task.Run(async () => + { try { while (await source.WaitToReadAsync()) diff --git a/src/SignalR/common/testassets/Tests.Utils/HubConnectionContextUtils.cs b/src/SignalR/common/testassets/Tests.Utils/HubConnectionContextUtils.cs index bbbfb1db96..79fb05425c 100644 --- a/src/SignalR/common/testassets/Tests.Utils/HubConnectionContextUtils.cs +++ b/src/SignalR/common/testassets/Tests.Utils/HubConnectionContextUtils.cs @@ -29,13 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests public static MockHubConnectionContext CreateMock(ConnectionContext connection) { - return new MockHubConnectionContext(connection, TimeSpan.FromSeconds(15), NullLoggerFactory.Instance, TimeSpan.FromSeconds(15)); + return new MockHubConnectionContext(connection, TimeSpan.FromSeconds(15), NullLoggerFactory.Instance, TimeSpan.FromSeconds(15), streamBufferCapacity: 10); } public class MockHubConnectionContext : HubConnectionContext { - public MockHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval) - : base(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval) + public MockHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval, int streamBufferCapacity) + : base(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval, streamBufferCapacity) { } diff --git a/src/SignalR/server/Core/ref/Microsoft.AspNetCore.SignalR.Core.netcoreapp3.0.cs b/src/SignalR/server/Core/ref/Microsoft.AspNetCore.SignalR.Core.netcoreapp3.0.cs index f73ec16dce..04e24eae47 100644 --- a/src/SignalR/server/Core/ref/Microsoft.AspNetCore.SignalR.Core.netcoreapp3.0.cs +++ b/src/SignalR/server/Core/ref/Microsoft.AspNetCore.SignalR.Core.netcoreapp3.0.cs @@ -129,6 +129,7 @@ namespace Microsoft.AspNetCore.SignalR { public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) { } public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, System.TimeSpan clientTimeoutInterval) { } + public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, System.TimeSpan clientTimeoutInterval, int streamBufferCapacity) { } public virtual System.Threading.CancellationToken ConnectionAborted { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } } public virtual string ConnectionId { get { throw null; } } public virtual Microsoft.AspNetCore.Http.Features.IFeatureCollection Features { get { throw null; } } @@ -202,6 +203,7 @@ namespace Microsoft.AspNetCore.SignalR public System.TimeSpan? HandshakeTimeout { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } } public System.TimeSpan? KeepAliveInterval { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } } public long? MaximumReceiveMessageSize { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } } + public int? StreamBufferCapacity { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } } public System.Collections.Generic.IList SupportedProtocols { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } } } public partial class HubOptions : Microsoft.AspNetCore.SignalR.HubOptions where THub : Microsoft.AspNetCore.SignalR.Hub diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 9e649f1b4e..bc061336ae 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -42,6 +42,16 @@ namespace Microsoft.AspNetCore.SignalR private ReadOnlyMemory _cachedPingMessage; private bool _clientTimeoutActive; private bool _connectedAborted; + private int _streamBufferCapacity; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying . + /// The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent. + /// The logger factory. + public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory) + : this(connectionContext, keepAliveInterval, loggerFactory, HubOptionsSetup.DefaultClientTimeoutInterval, HubOptionsSetup.DefaultStreamBufferCapacity) { } /// /// Initializes a new instance of the class. @@ -51,12 +61,24 @@ namespace Microsoft.AspNetCore.SignalR /// The logger factory. /// Clients we haven't heard from in this interval are assumed to have disconnected. public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval) + : this(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval, HubOptionsSetup.DefaultStreamBufferCapacity) { } + + /// + /// Initializes a new instance of the class. + /// + /// The underlying . + /// The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent. + /// The logger factory. + /// Clients we haven't heard from in this interval are assumed to have disconnected. + /// The buffer size for client upload streams + public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval, int streamBufferCapacity) { _connectionContext = connectionContext; _logger = loggerFactory.CreateLogger(); ConnectionAborted = _connectionAbortedTokenSource.Token; _keepAliveInterval = keepAliveInterval.Ticks; _clientTimeoutInterval = clientTimeoutInterval.Ticks; + _streamBufferCapacity = streamBufferCapacity; } internal StreamTracker StreamTracker @@ -66,19 +88,12 @@ namespace Microsoft.AspNetCore.SignalR // lazy for performance reasons if (_streamTracker == null) { - _streamTracker = new StreamTracker(); + _streamTracker = new StreamTracker(_streamBufferCapacity); } + return _streamTracker; } } - /// - /// Initializes a new instance of the class. - /// - /// The underlying . - /// The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent. - /// The logger factory. - public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory) - : this(connectionContext, keepAliveInterval, loggerFactory, HubOptionsSetup.DefaultClientTimeoutInterval) { } /// /// Gets a that notifies when the connection is aborted. diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 30c0073547..708d2b56bd 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -73,6 +73,7 @@ namespace Microsoft.AspNetCore.SignalR var keepAlive = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval; var clientTimeout = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval; var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout; + var streamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity; var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols; if (supportedProtocols != null && supportedProtocols.Count == 0) @@ -82,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR Log.ConnectedStarting(_logger); - var connectionContext = new HubConnectionContext(connection, keepAlive, _loggerFactory, clientTimeout); + var connectionContext = new HubConnectionContext(connection, keepAlive, _loggerFactory, clientTimeout, streamBufferCapacity); var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList) ?? supportedProtocols.ToList(); if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors)) diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index a4238750cb..7c49fd2957 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -17,17 +17,17 @@ namespace Microsoft.AspNetCore.SignalR // for all available protocols. /// - /// Gets or sets the interval used by the server to timeout incoming handshake requests by clients. + /// Gets or sets the interval used by the server to timeout incoming handshake requests by clients. The default timeout is 15 seconds /// public TimeSpan? HandshakeTimeout { get; set; } = null; /// - /// Gets or sets the interval used by the server to send keep alive pings to connected clients. + /// Gets or sets the interval used by the server to send keep alive pings to connected clients. The default interval is 15 seconds. /// public TimeSpan? KeepAliveInterval { get; set; } = null; /// - /// Gets or sets the time window clients have to send a message before the server closes the connection. + /// Gets or sets the time window clients have to send a message before the server closes the connection. The default timeout is 30 seconds. /// public TimeSpan? ClientTimeoutInterval { get; set; } = null; @@ -46,5 +46,10 @@ namespace Microsoft.AspNetCore.SignalR /// Detailed error messages include details from exceptions thrown on the server. /// public bool? EnableDetailedErrors { get; set; } = null; + + /// + /// Gets or sets the max buffer size for client upload streams. The default size is 10. + /// + public int? StreamBufferCapacity { get; set; } = null; } } diff --git a/src/SignalR/server/Core/src/Internal/HubOptionsSetup.cs b/src/SignalR/server/Core/src/Internal/HubOptionsSetup.cs index 0692a4a74b..ad4f9e9ec4 100644 --- a/src/SignalR/server/Core/src/Internal/HubOptionsSetup.cs +++ b/src/SignalR/server/Core/src/Internal/HubOptionsSetup.cs @@ -18,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal internal const int DefaultMaximumMessageSize = 32 * 1024; + internal const int DefaultStreamBufferCapacity = 10; + private readonly List _defaultProtocols = new List(); public HubOptionsSetup(IEnumerable protocols) @@ -52,6 +54,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal options.SupportedProtocols = new List(); } + if (options.StreamBufferCapacity == null) + { + options.StreamBufferCapacity = DefaultStreamBufferCapacity; + } + foreach (var protocol in _defaultProtocols) { options.SupportedProtocols.Add(protocol); diff --git a/src/SignalR/server/Core/src/StreamTracker.cs b/src/SignalR/server/Core/src/StreamTracker.cs index 1c5efc2871..fc4833abbf 100644 --- a/src/SignalR/server/Core/src/StreamTracker.cs +++ b/src/SignalR/server/Core/src/StreamTracker.cs @@ -16,14 +16,20 @@ namespace Microsoft.AspNetCore.SignalR internal class StreamTracker { private static readonly MethodInfo _buildConverterMethod = typeof(StreamTracker).GetMethods(BindingFlags.NonPublic | BindingFlags.Static).Single(m => m.Name.Equals("BuildStream")); + private readonly object[] _streamConverterArgs; private ConcurrentDictionary _lookup = new ConcurrentDictionary(); + public StreamTracker(int streamBufferCapacity) + { + _streamConverterArgs = new object[] { streamBufferCapacity }; + } + /// /// Creates a new stream and returns the ChannelReader for it as an object. /// public object AddStream(string streamId, Type itemType, Type targetType) { - var newConverter = (IStreamConverter)_buildConverterMethod.MakeGenericMethod(itemType).Invoke(null, Array.Empty()); + var newConverter = (IStreamConverter)_buildConverterMethod.MakeGenericMethod(itemType).Invoke(null, _streamConverterArgs); _lookup[streamId] = newConverter; return newConverter.GetReaderAsObject(targetType); } @@ -71,9 +77,9 @@ namespace Microsoft.AspNetCore.SignalR return true; } - private static IStreamConverter BuildStream() + private static IStreamConverter BuildStream(int streamBufferCapacity) { - return new ChannelConverter(); + return new ChannelConverter(streamBufferCapacity); } private interface IStreamConverter @@ -88,11 +94,9 @@ namespace Microsoft.AspNetCore.SignalR { private Channel _channel; - public ChannelConverter() + public ChannelConverter(int streamBufferCapacity) { - // TODO: Make this configurable or figure out a good limit - // https://github.com/aspnet/AspNetCore/issues/4399 - _channel = Channel.CreateBounded(10); + _channel = Channel.CreateBounded(streamBufferCapacity); } public Type GetItemType() diff --git a/src/SignalR/server/SignalR/test/AddSignalRTests.cs b/src/SignalR/server/SignalR/test/AddSignalRTests.cs index 325c293826..871a16ffa3 100644 --- a/src/SignalR/server/SignalR/test/AddSignalRTests.cs +++ b/src/SignalR/server/SignalR/test/AddSignalRTests.cs @@ -80,6 +80,20 @@ namespace Microsoft.AspNetCore.SignalR.Tests Assert.Equal(1, serviceProvider.GetRequiredService>().Value.SupportedProtocols.Count); Assert.Equal(0, serviceProvider.GetRequiredService>>().Value.SupportedProtocols.Count); } + + [Fact] + public void StreamBufferCapacityGetSet() + { + var serviceCollection = new ServiceCollection(); + + serviceCollection.AddSignalR().AddHubOptions(options => + { + options.StreamBufferCapacity = 42; + }); + + var serviceProvider = serviceCollection.BuildServiceProvider(); + Assert.Equal(42, serviceProvider.GetRequiredService>>().Value.StreamBufferCapacity); + } } public class CustomHub : Hub diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs index 9dfa8ad7a4..68d02497de 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs @@ -196,6 +196,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests return sb.ToString(); } + public async Task StreamDontRead(ChannelReader source) + { + while (await source.WaitToReadAsync()) + { + } + } + + public async Task StreamingSum(ChannelReader source) { var total = 0; diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index f74aee9a56..d78cf6f668 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2911,6 +2911,37 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Fact] + public async Task StreamUploadBufferCapacityBlocksOtherInvocations() + { + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => + { + services.Configure(options => + { + options.StreamBufferCapacity = 1; + }); + }); + + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + await client.BeginUploadStreamAsync("invocationId", nameof(MethodHub.StreamDontRead), new[] { "id" }, Array.Empty()).OrTimeout(); + + foreach (var letter in new[] { "A", "B", "C", "D", "E" }) + { + await client.SendHubMessageAsync(new StreamItemMessage("id", letter)).OrTimeout(); + } + + var ex = await Assert.ThrowsAsync(async () => + { + await client.SendInvocationAsync("Echo", "test"); + var result = (CompletionMessage)await client.ReadAsync().OrTimeout(5000); + }); + } + } + [Fact] public async Task UploadStringsToConcat() {