Make stream buffer capacity configurable (#9877)

This commit is contained in:
Mikael Mengistu 2019-05-07 17:13:07 -07:00 committed by GitHub
parent 11516c9272
commit a26c4e936b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 112 additions and 24 deletions

View File

@ -205,7 +205,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public static ChannelReader<string> StreamEcho(ChannelReader<string> source)
{
var output = Channel.CreateUnbounded<string>();
_ = Task.Run(async () => {
_ = Task.Run(async () =>
{
try
{
while (await source.WaitToReadAsync())

View File

@ -29,13 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public static MockHubConnectionContext CreateMock(ConnectionContext connection)
{
return new MockHubConnectionContext(connection, TimeSpan.FromSeconds(15), NullLoggerFactory.Instance, TimeSpan.FromSeconds(15));
return new MockHubConnectionContext(connection, TimeSpan.FromSeconds(15), NullLoggerFactory.Instance, TimeSpan.FromSeconds(15), streamBufferCapacity: 10);
}
public class MockHubConnectionContext : HubConnectionContext
{
public MockHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval)
: base(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval)
public MockHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval, int streamBufferCapacity)
: base(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval, streamBufferCapacity)
{
}

View File

@ -129,6 +129,7 @@ namespace Microsoft.AspNetCore.SignalR
{
public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) { }
public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, System.TimeSpan clientTimeoutInterval) { }
public HubConnectionContext(Microsoft.AspNetCore.Connections.ConnectionContext connectionContext, System.TimeSpan keepAliveInterval, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, System.TimeSpan clientTimeoutInterval, int streamBufferCapacity) { }
public virtual System.Threading.CancellationToken ConnectionAborted { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public virtual string ConnectionId { get { throw null; } }
public virtual Microsoft.AspNetCore.Http.Features.IFeatureCollection Features { get { throw null; } }
@ -202,6 +203,7 @@ namespace Microsoft.AspNetCore.SignalR
public System.TimeSpan? HandshakeTimeout { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public System.TimeSpan? KeepAliveInterval { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public long? MaximumReceiveMessageSize { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public int? StreamBufferCapacity { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public System.Collections.Generic.IList<string> SupportedProtocols { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
}
public partial class HubOptions<THub> : Microsoft.AspNetCore.SignalR.HubOptions where THub : Microsoft.AspNetCore.SignalR.Hub

View File

@ -42,6 +42,16 @@ namespace Microsoft.AspNetCore.SignalR
private ReadOnlyMemory<byte> _cachedPingMessage;
private bool _clientTimeoutActive;
private bool _connectedAborted;
private int _streamBufferCapacity;
/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
/// </summary>
/// <param name="connectionContext">The underlying <see cref="ConnectionContext"/>.</param>
/// <param name="keepAliveInterval">The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent.</param>
/// <param name="loggerFactory">The logger factory.</param>
public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory)
: this(connectionContext, keepAliveInterval, loggerFactory, HubOptionsSetup.DefaultClientTimeoutInterval, HubOptionsSetup.DefaultStreamBufferCapacity) { }
/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
@ -51,12 +61,24 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="loggerFactory">The logger factory.</param>
/// <param name="clientTimeoutInterval">Clients we haven't heard from in this interval are assumed to have disconnected.</param>
public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval)
: this(connectionContext, keepAliveInterval, loggerFactory, clientTimeoutInterval, HubOptionsSetup.DefaultStreamBufferCapacity) { }
/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
/// </summary>
/// <param name="connectionContext">The underlying <see cref="ConnectionContext"/>.</param>
/// <param name="keepAliveInterval">The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent.</param>
/// <param name="loggerFactory">The logger factory.</param>
/// <param name="clientTimeoutInterval">Clients we haven't heard from in this interval are assumed to have disconnected.</param>
/// <param name="streamBufferCapacity">The buffer size for client upload streams</param>
public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory, TimeSpan clientTimeoutInterval, int streamBufferCapacity)
{
_connectionContext = connectionContext;
_logger = loggerFactory.CreateLogger<HubConnectionContext>();
ConnectionAborted = _connectionAbortedTokenSource.Token;
_keepAliveInterval = keepAliveInterval.Ticks;
_clientTimeoutInterval = clientTimeoutInterval.Ticks;
_streamBufferCapacity = streamBufferCapacity;
}
internal StreamTracker StreamTracker
@ -66,19 +88,12 @@ namespace Microsoft.AspNetCore.SignalR
// lazy for performance reasons
if (_streamTracker == null)
{
_streamTracker = new StreamTracker();
_streamTracker = new StreamTracker(_streamBufferCapacity);
}
return _streamTracker;
}
}
/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
/// </summary>
/// <param name="connectionContext">The underlying <see cref="ConnectionContext"/>.</param>
/// <param name="keepAliveInterval">The keep alive interval. If no messages are sent by the server in this interval, a Ping message will be sent.</param>
/// <param name="loggerFactory">The logger factory.</param>
public HubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory)
: this(connectionContext, keepAliveInterval, loggerFactory, HubOptionsSetup.DefaultClientTimeoutInterval) { }
/// <summary>
/// Gets a <see cref="CancellationToken"/> that notifies when the connection is aborted.

View File

@ -73,6 +73,7 @@ namespace Microsoft.AspNetCore.SignalR
var keepAlive = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval;
var clientTimeout = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval;
var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
var streamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity;
var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols;
if (supportedProtocols != null && supportedProtocols.Count == 0)
@ -82,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR
Log.ConnectedStarting(_logger);
var connectionContext = new HubConnectionContext(connection, keepAlive, _loggerFactory, clientTimeout);
var connectionContext = new HubConnectionContext(connection, keepAlive, _loggerFactory, clientTimeout, streamBufferCapacity);
var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList();
if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors))

View File

@ -17,17 +17,17 @@ namespace Microsoft.AspNetCore.SignalR
// for all available protocols.
/// <summary>
/// Gets or sets the interval used by the server to timeout incoming handshake requests by clients.
/// Gets or sets the interval used by the server to timeout incoming handshake requests by clients. The default timeout is 15 seconds
/// </summary>
public TimeSpan? HandshakeTimeout { get; set; } = null;
/// <summary>
/// Gets or sets the interval used by the server to send keep alive pings to connected clients.
/// Gets or sets the interval used by the server to send keep alive pings to connected clients. The default interval is 15 seconds.
/// </summary>
public TimeSpan? KeepAliveInterval { get; set; } = null;
/// <summary>
/// Gets or sets the time window clients have to send a message before the server closes the connection.
/// Gets or sets the time window clients have to send a message before the server closes the connection. The default timeout is 30 seconds.
/// </summary>
public TimeSpan? ClientTimeoutInterval { get; set; } = null;
@ -46,5 +46,10 @@ namespace Microsoft.AspNetCore.SignalR
/// Detailed error messages include details from exceptions thrown on the server.
/// </summary>
public bool? EnableDetailedErrors { get; set; } = null;
/// <summary>
/// Gets or sets the max buffer size for client upload streams. The default size is 10.
/// </summary>
public int? StreamBufferCapacity { get; set; } = null;
}
}

View File

@ -18,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal
internal const int DefaultMaximumMessageSize = 32 * 1024;
internal const int DefaultStreamBufferCapacity = 10;
private readonly List<string> _defaultProtocols = new List<string>();
public HubOptionsSetup(IEnumerable<IHubProtocol> protocols)
@ -52,6 +54,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal
options.SupportedProtocols = new List<string>();
}
if (options.StreamBufferCapacity == null)
{
options.StreamBufferCapacity = DefaultStreamBufferCapacity;
}
foreach (var protocol in _defaultProtocols)
{
options.SupportedProtocols.Add(protocol);

View File

@ -16,14 +16,20 @@ namespace Microsoft.AspNetCore.SignalR
internal class StreamTracker
{
private static readonly MethodInfo _buildConverterMethod = typeof(StreamTracker).GetMethods(BindingFlags.NonPublic | BindingFlags.Static).Single(m => m.Name.Equals("BuildStream"));
private readonly object[] _streamConverterArgs;
private ConcurrentDictionary<string, IStreamConverter> _lookup = new ConcurrentDictionary<string, IStreamConverter>();
public StreamTracker(int streamBufferCapacity)
{
_streamConverterArgs = new object[] { streamBufferCapacity };
}
/// <summary>
/// Creates a new stream and returns the ChannelReader for it as an object.
/// </summary>
public object AddStream(string streamId, Type itemType, Type targetType)
{
var newConverter = (IStreamConverter)_buildConverterMethod.MakeGenericMethod(itemType).Invoke(null, Array.Empty<object>());
var newConverter = (IStreamConverter)_buildConverterMethod.MakeGenericMethod(itemType).Invoke(null, _streamConverterArgs);
_lookup[streamId] = newConverter;
return newConverter.GetReaderAsObject(targetType);
}
@ -71,9 +77,9 @@ namespace Microsoft.AspNetCore.SignalR
return true;
}
private static IStreamConverter BuildStream<T>()
private static IStreamConverter BuildStream<T>(int streamBufferCapacity)
{
return new ChannelConverter<T>();
return new ChannelConverter<T>(streamBufferCapacity);
}
private interface IStreamConverter
@ -88,11 +94,9 @@ namespace Microsoft.AspNetCore.SignalR
{
private Channel<T> _channel;
public ChannelConverter()
public ChannelConverter(int streamBufferCapacity)
{
// TODO: Make this configurable or figure out a good limit
// https://github.com/aspnet/AspNetCore/issues/4399
_channel = Channel.CreateBounded<T>(10);
_channel = Channel.CreateBounded<T>(streamBufferCapacity);
}
public Type GetItemType()

View File

@ -80,6 +80,20 @@ namespace Microsoft.AspNetCore.SignalR.Tests
Assert.Equal(1, serviceProvider.GetRequiredService<IOptions<HubOptions>>().Value.SupportedProtocols.Count);
Assert.Equal(0, serviceProvider.GetRequiredService<IOptions<HubOptions<CustomHub>>>().Value.SupportedProtocols.Count);
}
[Fact]
public void StreamBufferCapacityGetSet()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddSignalR().AddHubOptions<CustomHub>(options =>
{
options.StreamBufferCapacity = 42;
});
var serviceProvider = serviceCollection.BuildServiceProvider();
Assert.Equal(42, serviceProvider.GetRequiredService<IOptions<HubOptions<CustomHub>>>().Value.StreamBufferCapacity);
}
}
public class CustomHub : Hub

View File

@ -196,6 +196,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
return sb.ToString();
}
public async Task StreamDontRead(ChannelReader<string> source)
{
while (await source.WaitToReadAsync())
{
}
}
public async Task<int> StreamingSum(ChannelReader<int> source)
{
var total = 0;

View File

@ -2911,6 +2911,37 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task StreamUploadBufferCapacityBlocksOtherInvocations()
{
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services =>
{
services.Configure<HubOptions>(options =>
{
options.StreamBufferCapacity = 1;
});
});
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
await client.BeginUploadStreamAsync("invocationId", nameof(MethodHub.StreamDontRead), new[] { "id" }, Array.Empty<object>()).OrTimeout();
foreach (var letter in new[] { "A", "B", "C", "D", "E" })
{
await client.SendHubMessageAsync(new StreamItemMessage("id", letter)).OrTimeout();
}
var ex = await Assert.ThrowsAsync<TimeoutException>(async () =>
{
await client.SendInvocationAsync("Echo", "test");
var result = (CompletionMessage)await client.ReadAsync().OrTimeout(5000);
});
}
}
[Fact]
public async Task UploadStringsToConcat()
{