The grand unification of ConnectionContext (#1895)

This change rationalizes the 2 very similar abstractions that exist in Connections.Abstractions, IConnection and ConnectionContext. It also introduces an IConnectionFactory to SignalR that is used to create a new ConnectionContext for a HubConnection.

- HubConnection just completes both ends of the transport pipe instead of calling DisposeAsync.
- Implemented ConnectionContext on HttpConnection and added HttpConnectionFactory
-  Updated tests
This commit is contained in:
David Fowler 2018-04-07 16:19:01 -07:00 committed by GitHub
parent e3da7feab4
commit c1049b722d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 227 additions and 198 deletions

View File

@ -44,16 +44,19 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_tcs = new TaskCompletionSource<ReadResult>();
_pipe.AddReadResult(new ValueTask<ReadResult>(_tcs.Task));
var connection = new TestConnection();
// prevents keep alive time being activated
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe;
var protocol = Protocol == "json" ? (IHubProtocol)new JsonHubProtocol() : new MessagePackHubProtocol();
var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(protocol);
hubConnectionBuilder.WithConnectionFactory(() => connection);
hubConnectionBuilder.WithConnectionFactory(format =>
{
var connection = new DefaultConnectionContext();
// prevents keep alive time being activated
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe;
return Task.FromResult<ConnectionContext>(connection);
});
_hubConnection = hubConnectionBuilder.Build();
_hubConnection.StartAsync().GetAwaiter().GetResult();

View File

@ -8,13 +8,10 @@ using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
@ -40,14 +37,16 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_pipe = new TestDuplexPipe();
var connection = new TestConnection();
// prevents keep alive time being activated
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe;
var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(new JsonHubProtocol());
hubConnectionBuilder.WithConnectionFactory(() => connection);
hubConnectionBuilder.WithConnectionFactory(format =>
{
var connection = new DefaultConnectionContext();
// prevents keep alive time being activated
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe;
return Task.FromResult<ConnectionContext>(connection);
});
_hubConnection = hubConnectionBuilder.Build();
}
@ -71,26 +70,4 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
public TimeSpan KeepAliveInterval { get; } = TimeSpan.Zero;
}
public class TestConnection : IConnection
{
public Task StartAsync()
{
throw new NotImplementedException();
}
public Task StartAsync(TransferFormat transferFormat)
{
return Task.CompletedTask;
}
public Task DisposeAsync()
{
return Task.CompletedTask;
}
public IDuplexPipe Transport { get; set; }
public IFeatureCollection Features { get; } = new FeatureCollection();
}
}

View File

@ -1,30 +0,0 @@
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared
{
public class TestConnection : IConnection
{
public Task StartAsync()
{
throw new NotImplementedException();
}
public Task StartAsync(TransferFormat transferFormat)
{
return Task.CompletedTask;
}
public Task DisposeAsync()
{
return Task.CompletedTask;
}
public IDuplexPipe Transport { get; set; }
public IFeatureCollection Features { get; } = new FeatureCollection();
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Net;
@ -9,12 +10,11 @@ using Microsoft.AspNetCore.Http.Features;
namespace ClientSample
{
public class TcpConnection : IConnection
public class TcpConnection : ConnectionContext
{
private readonly Socket _socket;
private volatile bool _aborted;
private readonly EndPoint _endPoint;
private IDuplexPipe _transport;
private IDuplexPipe _application;
private SocketSender _sender;
private SocketReceiver _receiver;
@ -28,30 +28,34 @@ namespace ClientSample
_receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
}
public IDuplexPipe Transport => _transport;
public override IDuplexPipe Transport { get; set; }
public IFeatureCollection Features { get; } = new FeatureCollection();
public override IFeatureCollection Features { get; } = new FeatureCollection();
public override string ConnectionId { get; set; } = Guid.NewGuid().ToString();
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems();
public Task DisposeAsync()
{
_transport?.Output.Complete();
_transport?.Input.Complete();
Transport?.Output.Complete();
Transport?.Input.Complete();
_socket?.Dispose();
return Task.CompletedTask;
}
public async Task StartAsync()
public async Task<ConnectionContext> StartAsync()
{
await _socket.ConnectAsync(_endPoint);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
_transport = pair.Transport;
Transport = pair.Transport;
_application = pair.Application;
_ = ExecuteAsync();
return this;
}
private async Task ExecuteAsync()

View File

@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, EndPoint endPoint)
{
builder.WithConnectionFactory(() => new TcpConnection(endPoint));
builder.WithConnectionFactory(format => new TcpConnection(endPoint).StartAsync());
return builder;
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Http;
@ -19,7 +20,7 @@ using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.Http.Connections.Client
{
public partial class HttpConnection : IConnection
public partial class HttpConnection : ConnectionContext
{
private static readonly TimeSpan HttpClientTimeout = TimeSpan.FromSeconds(120);
#if !NETCOREAPP2_1
@ -44,7 +45,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
public Uri Url { get; }
public IDuplexPipe Transport
public override IDuplexPipe Transport
{
get
{
@ -55,9 +56,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
}
return _transport;
}
set
{
throw new NotSupportedException("The transport pipe isn't settable.");
}
}
public IFeatureCollection Features { get; } = new FeatureCollection();
public override IFeatureCollection Features { get; } = new FeatureCollection();
public override string ConnectionId { get; set; }
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems();
public HttpConnection(Uri url)
: this(url, HttpTransports.All)
@ -109,9 +116,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
_logScope = new ConnectionLogScope();
_scopeDisposable = _logger.BeginScope(_logScope);
}
public Task StartAsync() => StartAsync(TransferFormat.Binary);
public async Task StartAsync(TransferFormat transferFormat)
{
await StartAsyncCore(transferFormat).ForceAsync();
@ -461,4 +466,4 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
return negotiationResponse;
}
}
}
}

View File

@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private readonly ILogger _logger;
private readonly IHubProtocol _protocol;
private readonly IServiceProvider _serviceProvider;
private readonly Func<IConnection> _connectionFactory;
private readonly IConnectionFactory _connectionFactory;
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private bool _disposed;
@ -48,7 +48,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public TimeSpan ServerTimeout { get; set; } = DefaultServerTimeout;
public TimeSpan HandshakeTimeout { get; set; } = DefaultHandshakeTimeout;
public HubConnection(Func<IConnection> connectionFactory, IHubProtocol protocol, IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
public HubConnection(IConnectionFactory connectionFactory, IHubProtocol protocol, IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_protocol = protocol ?? throw new ArgumentNullException(nameof(protocol));
@ -128,8 +128,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.Starting(_logger);
// Start the connection
var connection = _connectionFactory();
await connection.StartAsync(_protocol.TransferFormat);
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat);
var startingConnectionState = new ConnectionState(connection, this);
// From here on, if an error occurs we need to shut down the connection because
@ -144,7 +143,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.ErrorStartingConnection(_logger, ex);
// Can't have any invocations to cancel, we're in the lock.
await startingConnectionState.Connection.DisposeAsync();
Complete(startingConnectionState.Connection);
throw;
}
@ -159,6 +158,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private static void Complete(ConnectionContext connection)
{
connection.Transport.Output.Complete();
connection.Transport.Input.Complete();
}
// This method does both Dispose and Start, the 'disposing' flag indicates which.
// The behaviors are nearly identical, except that the _disposed flag is set in the lock
// if we're disposing.
@ -661,7 +666,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
timeoutTimer?.Dispose();
// Dispose the connection
await connectionState.Connection.DisposeAsync();
Complete(connectionState.Connection);
// Cancel any outstanding invocations within the connection lock
connectionState.CancelOutstandingInvocations(connectionState.CloseException);
@ -833,7 +838,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>();
private int _nextId;
public IConnection Connection { get; }
public ConnectionContext Connection { get; }
public Task ReceiveTask { get; set; }
public Exception CloseException { get; set; }
@ -843,7 +848,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
set => _stopping = value;
}
public ConnectionState(IConnection connection, HubConnection hubConnection)
public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
{
_hubConnection = hubConnection;
Connection = connection;

View File

@ -4,6 +4,7 @@
using System;
using System.ComponentModel;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.Extensions.DependencyInjection;
@ -40,10 +41,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
// The service provider is disposed by the HubConnection
var serviceProvider = Services.BuildServiceProvider();
var connectionFactory = serviceProvider.GetService<Func<IConnection>>();
var connectionFactory = serviceProvider.GetService<IConnectionFactory>();
if (connectionFactory == null)
{
throw new InvalidOperationException("Cannot create HubConnection instance. A connection was not configured.");
throw new InvalidOperationException($"Cannot create {nameof(HubConnection)} instance. An {nameof(IConnectionFactory)} was not configured.");
}
return serviceProvider.GetService<HubConnection>();

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.Extensions.DependencyInjection;
@ -12,9 +13,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static class HubConnectionBuilderExtensions
{
public static IHubConnectionBuilder WithConnectionFactory(this IHubConnectionBuilder hubConnectionBuilder, Func<IConnection> connectionFactory)
public static IHubConnectionBuilder WithConnectionFactory(this IHubConnectionBuilder hubConnectionBuilder, Func<TransferFormat, Task<ConnectionContext>> connectionFactory)
{
hubConnectionBuilder.Services.AddSingleton(connectionFactory);
if (connectionFactory == null)
{
throw new ArgumentNullException(nameof(connectionFactory));
}
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(new DelegateConnectionFactory(connectionFactory));
return hubConnectionBuilder;
}
@ -29,5 +34,20 @@ namespace Microsoft.AspNetCore.SignalR.Client
hubConnectionBuilder.Services.AddLogging(configureLogging);
return hubConnectionBuilder;
}
private class DelegateConnectionFactory : IConnectionFactory
{
private readonly Func<TransferFormat, Task<ConnectionContext>> _connectionFactory;
public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectionFactory)
{
_connectionFactory = connectionFactory;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{
return _connectionFactory(transferFormat);
}
}
}
}

View File

@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
namespace Microsoft.AspNetCore.SignalR.Client
{
public interface IConnectionFactory
{
Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat);
}
}

View File

@ -0,0 +1,55 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.SignalR.Client
{
public class HttpConnectionFactory : IConnectionFactory
{
private readonly HttpConnectionOptions _options;
private readonly ILoggerFactory _loggerFactory;
public HttpConnectionFactory(IOptions<HttpConnectionOptions> options, ILoggerFactory loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
if (loggerFactory == null)
{
throw new ArgumentNullException(nameof(loggerFactory));
}
_options = options.Value;
_loggerFactory = loggerFactory;
}
public async Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{
var httpOptions = new HttpOptions
{
HttpMessageHandlerFactory = _options.MessageHandlerFactory,
Headers = _options._headers != null ? new ReadOnlyDictionary<string, string>(_options._headers) : null,
AccessTokenFactory = _options.AccessTokenFactory,
WebSocketOptions = _options.WebSocketOptions,
Cookies = _options._cookies,
Proxy = _options.Proxy,
UseDefaultCredentials = _options.UseDefaultCredentials,
ClientCertificates = _options._clientCertificates,
Credentials = _options.Credentials,
};
var connection = new HttpConnection(_options.Url, _options.Transports, _loggerFactory, httpOptions);
await connection.StartAsync(transferFormat);
return connection;
}
}
}

View File

@ -2,14 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.ObjectModel;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.Http.Connections.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.SignalR.Client
{
@ -79,32 +73,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
hubConnectionBuilder.Services.Configure(configureHttpConnection);
}
hubConnectionBuilder.Services.AddSingleton(services =>
{
var value = services.GetService<IOptions<HttpConnectionOptions>>().Value;
var httpOptions = new HttpOptions
{
HttpMessageHandlerFactory = value.MessageHandlerFactory,
Headers = value._headers != null ? new ReadOnlyDictionary<string, string>(value._headers) : null,
AccessTokenFactory = value.AccessTokenFactory,
WebSocketOptions = value.WebSocketOptions,
Cookies = value._cookies,
Proxy = value.Proxy,
UseDefaultCredentials = value.UseDefaultCredentials,
ClientCertificates = value._clientCertificates,
Credentials = value.Credentials,
};
Func<IConnection> createConnection = () => new HttpConnection(
value.Url,
value.Transports,
services.GetService<ILoggerFactory>(),
httpOptions);
return createConnection;
});
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory, HttpConnectionFactory>();
return hubConnectionBuilder;
}
}

View File

@ -57,9 +57,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
return hubConnectionBuilder.Build();
}
private Func<IConnection> GetHttpConnectionFactory(ILoggerFactory loggerFactory, string path, HttpTransportType transportType)
private Func<TransferFormat, Task<ConnectionContext>> GetHttpConnectionFactory(ILoggerFactory loggerFactory, string path, HttpTransportType transportType)
{
return () => new HttpConnection(new Uri(_serverFixture.Url + path), transportType, loggerFactory);
return async format =>
{
var connection = new HttpConnection(new Uri(_serverFixture.Url + path), transportType, loggerFactory);
await connection.StartAsync(format);
return connection;
};
}
[Theory]

View File

@ -4,6 +4,7 @@
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.Extensions.Logging;

View File

@ -3,6 +3,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.Extensions.DependencyInjection;
@ -19,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public void HubConnectionBuiderThrowsIfConnectionFactoryNotConfigured()
{
var ex = Assert.Throws<InvalidOperationException>(() => new HubConnectionBuilder().Build());
Assert.Equal("Cannot create HubConnection instance. A connection was not configured.", ex.Message);
Assert.Equal("Cannot create HubConnection instance. An IConnectionFactory was not configured.", ex.Message);
}
[Fact]
@ -47,19 +48,28 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
[Fact]
public void WithConnectionFactorySetsConnectionFactory()
public async Task WithConnectionFactorySetsConnectionFactory()
{
Func<IConnection> connectionFactory = () => null;
var called = false;
Func<TransferFormat, Task<ConnectionContext>> connectionFactory = format =>
{
called = true;
return Task.FromResult<ConnectionContext>(null);
};
var serviceProvider = new HubConnectionBuilder().WithConnectionFactory(connectionFactory).Services.BuildServiceProvider();
Assert.Equal(connectionFactory, serviceProvider.GetService<Func<IConnection>>());
var factory = serviceProvider.GetService<IConnectionFactory>();
Assert.NotNull(factory);
Assert.False(called);
await factory.ConnectAsync(TransferFormat.Text);
Assert.True(called);
}
[Fact]
public void BuildCanOnlyBeCalledOnce()
{
var builder = new HubConnectionBuilder().WithConnectionFactory(() => null);
var builder = new HubConnectionBuilder().WithConnectionFactory(format => null);
Assert.NotNull(builder.Build());

View File

@ -25,10 +25,17 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public static IEnumerable<object[]> MethodsNamesThatRequireActiveConnection => MethodsThatRequireActiveConnection.Keys.Select(k => new object[] { k });
private HubConnection CreateHubConnection(Func<IConnection> connectionFactory)
private HubConnection CreateHubConnection(TestConnection testConnection)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(connectionFactory);
builder.WithConnectionFactory(format => testConnection.StartAsync(format));
return builder.Build();
}
private HubConnection CreateHubConnection(Func<TransferFormat, Task<ConnectionContext>> connectionFactory)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => connectionFactory(format));
return builder.Build();
}
@ -36,7 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task StartAsyncStartsTheUnderlyingConnection()
{
var testConnection = new TestConnection();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
await connection.StartAsync();
Assert.True(testConnection.Started.IsCompleted);
@ -48,12 +55,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
// Set up StartAsync to wait on the syncPoint when starting
var testConnection = new TestConnection(onStart: SyncPoint.Create(out var syncPoint));
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
var firstStart = connection.StartAsync().OrTimeout();
Assert.False(firstStart.IsCompleted);
// Wait for us to be in IConnection.StartAsync
// Wait for us to be in IConnectionFactory.ConnectAsync
await syncPoint.WaitForSyncPoint();
// Try starting again
@ -74,10 +81,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
// Set up StartAsync to wait on the syncPoint when starting
var createCount = 0;
IConnection ConnectionFactory()
Task<ConnectionContext> ConnectionFactory(TransferFormat format)
{
createCount += 1;
return new TestConnection();
return new TestConnection().StartAsync(format);
}
await AsyncUsing(CreateHubConnection(ConnectionFactory), async connection =>
@ -91,44 +98,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
}
[Fact]
public async Task StartingDuringStopCreatesANewConnection()
{
// Set up StartAsync to wait on the syncPoint when starting
var createCount = 0;
var onDisposeForFirstConnection = SyncPoint.Create(out var syncPoint);
IConnection ConnectionFactory()
{
createCount += 1;
return new TestConnection(onDispose: createCount == 1 ? onDisposeForFirstConnection : null);
}
await AsyncUsing(CreateHubConnection(ConnectionFactory), async connection =>
{
await connection.StartAsync().OrTimeout();
Assert.Equal(1, createCount);
var stopTask = connection.StopAsync().OrTimeout();
// Wait to hit DisposeAsync on TestConnection (which should be after StopAsync has cleared the connection state)
await syncPoint.WaitForSyncPoint();
// We should be able to start now, and StopAsync hasn't completed, nor will it complete while Starting
Assert.False(stopTask.IsCompleted);
await connection.StartAsync().OrTimeout();
Assert.False(stopTask.IsCompleted);
// When we release the sync point, the StopAsync task will finish
syncPoint.Continue();
await stopTask;
});
}
[Fact]
public async Task StartAsyncWithFailedHandshakeCanBeStopped()
{
var testConnection = new TestConnection(autoHandshake: false);
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
testConnection.Transport.Input.Complete();
try
@ -150,7 +124,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var method = MethodsThatRequireActiveConnection[name];
var testConnection = new TestConnection();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => method(connection));
Assert.Equal($"The '{name}' method cannot be called if the connection is not active", ex.Message);
@ -165,7 +139,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// Set up StartAsync to wait on the syncPoint when starting
var testConnection = new TestConnection(onStart: SyncPoint.Create(out var syncPoint));
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
// Start, and wait for the sync point to be hit
var startTask = connection.StartAsync().OrTimeout();
@ -196,13 +170,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task StopAsyncStopsConnection()
{
var testConnection = new TestConnection();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
await connection.StartAsync().OrTimeout();
Assert.True(testConnection.Started.IsCompleted);
await connection.StopAsync().OrTimeout();
Assert.True(testConnection.Disposed.IsCompleted);
await testConnection.Disposed.OrTimeout();
});
}
@ -210,7 +184,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task StopAsyncNoOpsIfConnectionNotYetStarted()
{
var testConnection = new TestConnection();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
await connection.StopAsync().OrTimeout();
Assert.False(testConnection.Disposed.IsCompleted);
@ -221,13 +195,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task StopAsyncNoOpsIfConnectionAlreadyStopped()
{
var testConnection = new TestConnection();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
await connection.StartAsync().OrTimeout();
Assert.True(testConnection.Started.IsCompleted);
await connection.StopAsync().OrTimeout();
Assert.True(testConnection.Disposed.IsCompleted);
await testConnection.Disposed.OrTimeout();
await connection.StopAsync().OrTimeout();
});
@ -238,7 +212,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var testConnection = new TestConnection();
var closed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
connection.Closed += (e) => closed.TrySetResult(null);
await connection.StartAsync().OrTimeout();
@ -260,7 +234,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var testConnection = new TestConnection();
var testConnectionClosed = new TaskCompletionSource<object>();
var connectionClosed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
// We're hooking the TestConnection shutting down here because the HubConnection one will be blocked on the lock
testConnection.Transport.Input.OnWriterCompleted((_, __) => testConnectionClosed.TrySetResult(null), null);
@ -286,6 +260,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo").OrTimeout());
Assert.Equal($"The '{nameof(HubConnection.SendAsync)}' method cannot be called if the connection is not active", ex.Message);
await testConnection.Disposed.OrTimeout();
Assert.Equal(1, testConnection.DisposeCount);
});
}
@ -295,7 +271,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var testConnection = new TestConnection();
var connectionClosed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
connection.Closed += (e) => connectionClosed.TrySetResult(null);
@ -328,7 +304,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// Set up StartAsync to wait on the syncPoint when starting
var testConnection = new TestConnection(onDispose: SyncPoint.Create(out var syncPoint));
await AsyncUsing(CreateHubConnection(() => testConnection), async connection =>
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
await connection.StartAsync().OrTimeout();
@ -352,7 +328,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task ClientTimesoutWhenHandshakeResponseTakesTooLong()
{
var connection = new TestConnection(autoHandshake: false);
var hubConnection = CreateHubConnection(() => connection);
var hubConnection = CreateHubConnection(connection);
try
{
hubConnection.HandshakeTimeout = TimeSpan.FromMilliseconds(1);
@ -375,7 +351,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
onStartCalled = true;
return Task.CompletedTask;
});
var hubConnection = CreateHubConnection(() => connection);
var hubConnection = CreateHubConnection(connection);
try
{
await Assert.ThrowsAsync<OperationCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true)).OrTimeout());
@ -397,7 +373,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
cts.Cancel();
return Task.CompletedTask;
}, autoHandshake: false);
var hubConnection = CreateHubConnection(() => connection);
var hubConnection = CreateHubConnection(connection);
// We want to make sure the cancellation is because of the token passed to StartAsync
hubConnection.HandshakeTimeout = Timeout.InfiniteTimeSpan;
try

View File

@ -13,7 +13,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private static HubConnection CreateHubConnection(TestConnection connection, IHubProtocol protocol = null)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(() => connection);
builder.WithConnectionFactory(async format =>
{
await connection.StartAsync(format);
return connection;
});
if (protocol != null)
{
builder.WithHubProtocol(protocol);

View File

@ -44,7 +44,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task ClosedEventRaisedWhenTheClientIsStopped()
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(() => new TestConnection());
builder.WithConnectionFactory(format => new TestConnection().StartAsync(format));
var hubConnection = builder.Build();
var closedEventTcs = new TaskCompletionSource<Exception>();

View File

@ -3,6 +3,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
@ -17,7 +18,7 @@ using Newtonsoft.Json;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
internal class TestConnection : IConnection
internal class TestConnection : ConnectionContext
{
private readonly bool _autoHandshake;
private readonly TaskCompletionSource<object> _started = new TaskCompletionSource<object>();
@ -31,12 +32,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private readonly Func<Task> _onStart;
private readonly Func<Task> _onDispose;
public IDuplexPipe Application { get; }
public IDuplexPipe Transport { get; }
public override string ConnectionId { get; set; }
public IFeatureCollection Features { get; } = new FeatureCollection();
public IDuplexPipe Application { get; }
public override IDuplexPipe Transport { get; set; }
public override IFeatureCollection Features { get; } = new FeatureCollection();
public int DisposeCount => _disposeCount;
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems();
public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoHandshake = true, bool synchronousCallbacks = false)
{
_autoHandshake = autoHandshake;
@ -50,14 +55,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Application = pair.Application;
Transport = pair.Transport;
Application.Input.OnWriterCompleted((ex, _) => Application.Output.Complete(), null);
Application.Input.OnWriterCompleted((ex, _) =>
{
Application.Output.Complete();
_ = DisposeAsync();
},
null);
}
public Task DisposeAsync() => DisposeCoreAsync();
public Task StartAsync() => StartAsync(TransferFormat.Binary);
public async Task StartAsync(TransferFormat transferFormat)
public async Task<ConnectionContext> StartAsync(TransferFormat transferFormat = TransferFormat.Binary)
{
_started.TrySetResult(null);
@ -69,6 +78,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// HubConnection.StartAsync which sends the Handshake in the first place!
_ = ReadHandshakeAndSendResponseAsync();
}
return this;
}
public async Task<string> ReadHandshakeAndSendResponseAsync()