Transport Fallback (#1455)

This commit is contained in:
Mikael Mengistu 2018-03-08 06:49:03 +00:00 committed by GitHub
parent 98e270ea6a
commit adbd964efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 34 deletions

View File

@ -95,11 +95,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
_httpConnectionClosed(logger, null);
}
public static void StartingTransport(ILogger logger, ITransport transport, Uri url)
public static void StartingTransport(ILogger logger, TransportType transportType, Uri url)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_startingTransport(logger, transport.GetType().Name, url, null);
_startingTransport(logger, transportType.ToString(), url, null);
}
}

View File

@ -47,6 +47,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
private PipeWriter Output => _transportChannel.Output;
private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
private readonly TransportType _requestedTransportType = TransportType.All;
private TransportType _serverTransports = TransportType.All;
// The order of the transports here is the order determines the fallback order.
private static readonly TransportType[] AllTransports = new[]{ TransportType.WebSockets, TransportType.ServerSentEvents, TransportType.LongPolling };
private readonly ConnectionLogScope _logScope;
private readonly IDisposable _scopeDisposable;
@ -142,6 +145,14 @@ namespace Microsoft.AspNetCore.Sockets.Client
return _startTcs.Task;
}
private async Task<NegotiationResponse> GetNegotiationResponse()
{
var negotiationResponse = await Negotiate(Url, _httpClient, _logger);
_connectionId = negotiationResponse.ConnectionId;
_logScope.ConnectionId = _connectionId;
return negotiationResponse;
}
private async Task StartAsyncInternal()
{
Log.HttpConnectionStarting(_logger);
@ -151,13 +162,12 @@ namespace Microsoft.AspNetCore.Sockets.Client
var connectUrl = Url;
if (_requestedTransportType == TransportType.WebSockets)
{
_transport = _transportFactory.CreateTransport(TransportType.WebSockets);
Log.StartingTransport(_logger, _requestedTransportType, connectUrl);
await StartTransport(connectUrl, _requestedTransportType);
}
else
{
var negotiationResponse = await Negotiate(Url, _httpClient, _logger);
_connectionId = negotiationResponse.ConnectionId;
_logScope.ConnectionId = _connectionId;
var negotiationResponse = await GetNegotiationResponse();
// Connection is being disposed while start was in progress
if (_connectionState == ConnectionState.Disposed)
@ -166,13 +176,43 @@ namespace Microsoft.AspNetCore.Sockets.Client
return;
}
_transport = _transportFactory.CreateTransport(GetAvailableServerTransports(negotiationResponse));
connectUrl = CreateConnectUrl(Url, negotiationResponse);
}
// This should only need to happen once
_serverTransports = GetAvailableServerTransports(negotiationResponse);
connectUrl = CreateConnectUrl(Url, negotiationResponse.ConnectionId);
Log.StartingTransport(_logger, _transport, connectUrl);
await StartTransport(connectUrl);
foreach (var transport in AllTransports)
{
try
{
if ((transport & _serverTransports & _requestedTransportType) != 0)
{
// The negotiation response gets cleared in the fallback scenario.
if (negotiationResponse == null)
{
negotiationResponse = await GetNegotiationResponse();
connectUrl = CreateConnectUrl(Url, negotiationResponse.ConnectionId);
}
Log.StartingTransport(_logger, transport, connectUrl);
await StartTransport(connectUrl, transport);
break;
}
}
catch (Exception)
{
// Try the next transport
// Clear the negotiation response so we know to re-negotiate.
negotiationResponse = null;
}
}
}
if (_transport == null)
{
throw new InvalidOperationException("Unable to connect to the server with any of the available transports.");
}
}
catch
{
// The connection can now be either in the Connecting or Disposed state - only change the state to
@ -314,21 +354,23 @@ namespace Microsoft.AspNetCore.Sockets.Client
return availableServerTransports;
}
private static Uri CreateConnectUrl(Uri url, NegotiationResponse negotiationResponse)
private static Uri CreateConnectUrl(Uri url, string connectionId)
{
if (string.IsNullOrWhiteSpace(negotiationResponse.ConnectionId))
if (string.IsNullOrWhiteSpace(connectionId))
{
throw new FormatException("Invalid connection id returned in negotiation response.");
throw new FormatException("Invalid connection id.");
}
return Utils.AppendQueryString(url, "id=" + negotiationResponse.ConnectionId);
return Utils.AppendQueryString(url, "id=" + connectionId);
}
private async Task StartTransport(Uri connectUrl)
private async Task StartTransport(Uri connectUrl, TransportType transportType)
{
var options = new PipeOptions(readerScheduler: PipeScheduler.ThreadPool);
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transportChannel = pair.Transport;
_transport = _transportFactory.CreateTransport(transportType);
// Start the transport, giving it one end of the pipeline
try
@ -346,6 +388,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
catch (Exception ex)
{
Log.ErrorStartingTransport(_logger, _transport, ex);
_transport = null;
throw;
}
}

View File

@ -108,20 +108,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task CanStartConnectionThatFailedToStart()
[Theory]
[InlineData(2)]
[InlineData(3)]
public async Task TransportThatFailsToStartOnceFallsBack(int passThreshold)
{
using (StartLog(out var loggerFactory))
{
var startCounter = 0;
var expected = new Exception("Transport failed to start");
var shouldFail = true;
Task OnTransportStart()
{
if (shouldFail)
startCounter++;
if (startCounter < passThreshold)
{
// Succeed next time
shouldFail = false;
return Task.FromException(expected);
}
else
@ -136,17 +138,39 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
transport: new TestTransport(onTransportStart: OnTransportStart)),
async (connection, closed) =>
{
var actual = await Assert.ThrowsAsync<Exception>(() => connection.StartAsync());
Assert.Same(expected, actual);
// Should succeed this time
shouldFail = false;
await connection.StartAsync().OrTimeout();
Assert.Equal(0, startCounter);
await connection.StartAsync();
Assert.Equal(passThreshold, startCounter);
});
}
}
[Fact]
public async Task StartThrowsAfterAllTransportsFail()
{
using (StartLog(out var loggerFactory))
{
var startCounter = 0;
var expected = new Exception("Transport failed to start");
Task OnTransportStart()
{
startCounter++;
return Task.FromException(expected);
}
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new TestTransport(onTransportStart: OnTransportStart)),
async (connection, closed) =>
{
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.StartAsync());
Assert.Equal("Unable to connect to the server with any of the available transports.", ex.Message);
Assert.Equal(3, startCounter);
});
}
}
[Fact]
public async Task CanStartStoppedConnection()
{
@ -214,9 +238,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var httpHandler = new TestHttpMessageHandler();
var longPollResult = new TaskCompletionSource<HttpResponseMessage>();
httpHandler.OnLongPoll(cancellationToken =>
{
cancellationToken.Register(() =>
httpHandler.OnLongPoll(cancellationToken =>
{
cancellationToken.Register(() =>
{
longPollResult.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent));
});

View File

@ -26,7 +26,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
public Task StartThrowsFormatExceptionIfNegotiationResponseHasNoConnectionId()
{
return RunInvalidNegotiateResponseTest<FormatException>(ResponseUtils.CreateNegotiationContent(connectionId: null), "Invalid connection id returned in negotiation response.");
return RunInvalidNegotiateResponseTest<FormatException>(ResponseUtils.CreateNegotiationContent(connectionId: null), "Invalid connection id.");
}
[Fact]
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[InlineData(TransportType.ServerSentEvents)]
public Task ConnectionCannotBeStartedIfNoCommonTransportsBetweenClientAndServer(TransportType serverTransports)
{
return RunInvalidNegotiateResponseTest<InvalidOperationException>(ResponseUtils.CreateNegotiationContent(transportTypes: serverTransports), "No requested transports available on the server.");
return RunInvalidNegotiateResponseTest<InvalidOperationException>(ResponseUtils.CreateNegotiationContent(transportTypes: serverTransports), "Unable to connect to the server with any of the available transports.");
}
[Theory]

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
@ -14,6 +15,7 @@ using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Features;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.AspNetCore.WebUtilities;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Moq;
@ -55,6 +57,19 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await connection.DisposeAsync().OrTimeout();
}
[Fact]
public async Task TransportThatFallsbackCreatesNewConnection()
{
var url = _serverFixture.Url + "/echo";
// The test should connect to the server using WebSockets transport on Windows 8 and newer.
// On Windows 7/2008R2 it should use ServerSentEvents transport to connect to the server.
// The test logic lives in the TestTransportFactory and FakeTransport.
var connection = new HttpConnection(new Uri(url), new TestTransportFactory(), null, null);
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
[Theory]
[MemberData(nameof(TransportTypes))]
public async Task CanStartAndStopConnectionUsingGivenTransport(TransportType transportType)
@ -333,7 +348,6 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var logger = loggerFactory.CreateLogger<EndToEndTests>();
var url = _serverFixture.Url + "/uncreatable";
var connection = new HubConnectionBuilder()
.WithUrl(new Uri(url))
.WithTransport(transportType)
@ -384,6 +398,62 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
// Serves a fake transport that lets us verify fallback behavior
private class TestTransportFactory : ITransportFactory
{
private ITransport _transport;
public ITransport CreateTransport(TransportType availableServerTransports)
{
if (_transport == null)
{
_transport = new FakeTransport();
}
return _transport;
}
}
private class FakeTransport : ITransport
{
public TransferMode? Mode => TransferMode.Text;
public string prevConnectionId = null;
private int tries = 0;
private IDuplexPipe _application;
public Task StartAsync(Uri url, IDuplexPipe application, TransferMode requestedTransferMode, IConnection connection)
{
_application = application;
tries++;
Assert.True(QueryHelpers.ParseQuery(url.Query.ToString()).TryGetValue("id", out var id));
if (prevConnectionId == null)
{
prevConnectionId = id;
}
else
{
Assert.True(prevConnectionId != id);
prevConnectionId = id;
}
if (tries < 3)
{
throw new Exception();
}
else
{
return Task.CompletedTask;
}
}
public Task StopAsync()
{
_application.Output.Complete();
_application.Input.Complete();
return Task.CompletedTask;
}
}
public static IEnumerable<object[]> TransportTypes
{
get