Adding transport factory

This commit is contained in:
moozzyk 2017-04-04 10:20:51 -07:00
parent 0546dc21f4
commit 04719dee82
13 changed files with 347 additions and 197 deletions

View File

@ -2,12 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging;
namespace ClientSample
@ -26,44 +24,40 @@ namespace ClientSample
loggerFactory.AddConsole(LogLevel.Debug);
var logger = loggerFactory.CreateLogger<Program>();
using (var httpClient = new HttpClient(new LoggingMessageHandler(loggerFactory, new HttpClientHandler())))
logger.LogInformation("Connecting to {0}", baseUrl);
var connection = new HubConnection(new Uri(baseUrl), new JsonNetInvocationAdapter(), loggerFactory);
try
{
logger.LogInformation("Connecting to {0}", baseUrl);
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri(baseUrl), new JsonNetInvocationAdapter(), loggerFactory);
try
await connection.StartAsync();
logger.LogInformation("Connected to {0}", baseUrl);
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, a) =>
{
await connection.StartAsync(transport, httpClient);
logger.LogInformation("Connected to {0}", baseUrl);
a.Cancel = true;
logger.LogInformation("Stopping loops...");
cts.Cancel();
};
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, a) =>
{
a.Cancel = true;
logger.LogInformation("Stopping loops...");
cts.Cancel();
};
// Set up handler
connection.On("Send", new[] { typeof(string) }, a =>
{
var message = (string)a[0];
Console.WriteLine("RECEIVED: " + message);
});
while (!cts.Token.IsCancellationRequested)
{
var line = Console.ReadLine();
logger.LogInformation("Sending: {0}", line);
await connection.Invoke<object>("Send", line);
}
}
finally
// Set up handler
connection.On("Send", new[] { typeof(string) }, a =>
{
await connection.DisposeAsync();
var message = (string)a[0];
Console.WriteLine("RECEIVED: " + message);
});
while (!cts.Token.IsCancellationRequested)
{
var line = Console.ReadLine();
logger.LogInformation("Sending: {0}", line);
await connection.Invoke<object>("Send", line);
}
}
finally
{
await connection.DisposeAsync();
}
}
}
}

View File

@ -18,7 +18,7 @@ namespace ClientSample
{
public static async Task MainAsync(string[] args)
{
if(args.Contains("--debug"))
if (args.Contains("--debug"))
{
Console.WriteLine($"Ready for debugger to attach. Process ID: {Process.GetCurrentProcess().Id}");
Console.Write("Press ENTER to Continue");
@ -36,34 +36,30 @@ namespace ClientSample
loggerFactory.AddConsole(LogLevel.Debug);
var logger = loggerFactory.CreateLogger<Program>();
using (var httpClient = new HttpClient(new LoggingMessageHandler(loggerFactory, new HttpClientHandler())))
logger.LogInformation("Connecting to {0}", baseUrl);
var connection = new Connection(new Uri(baseUrl), loggerFactory);
try
{
logger.LogInformation("Connecting to {0}", baseUrl);
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new Connection(new Uri(baseUrl), loggerFactory);
try
var cts = new CancellationTokenSource();
connection.Received += (data, format) => logger.LogInformation($"Received: {Encoding.UTF8.GetString(data)}");
connection.Closed += e => cts.Cancel();
await connection.StartAsync();
logger.LogInformation("Connected to {0}", baseUrl);
Console.CancelKeyPress += (sender, a) =>
{
var cts = new CancellationTokenSource();
connection.Received += (data, format) => logger.LogInformation($"Received: {Encoding.UTF8.GetString(data)}");
connection.Closed += e => cts.Cancel();
a.Cancel = true;
logger.LogInformation("Stopping loops...");
cts.Cancel();
};
await connection.StartAsync(transport, httpClient);
logger.LogInformation("Connected to {0}", baseUrl);
Console.CancelKeyPress += (sender, a) =>
{
a.Cancel = true;
logger.LogInformation("Stopping loops...");
cts.Cancel();
};
await StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
}
finally
{
await connection.DisposeAsync();
}
await StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
}
finally
{
await connection.DisposeAsync();
}
}

View File

@ -19,11 +19,14 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public class HubConnection
{
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IConnection _connection;
private readonly IInvocationAdapter _adapter;
private readonly HubBinder _binder;
private HttpClient _httpClient;
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
// We need to ensure pending calls added after a connection failure don't hang. Right now the easiest thing to do is lock.
@ -68,23 +71,36 @@ namespace Microsoft.AspNetCore.SignalR.Client
_connection = connection;
_binder = new HubBinder(this);
_adapter = adapter;
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<HubConnection>();
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_connection.Received += OnDataReceived;
_connection.Closed += Shutdown;
}
public Task StartAsync() => StartAsync(null, null);
public Task StartAsync(HttpClient httpClient) => StartAsync(null, httpClient);
public Task StartAsync(ITransport transport) => StartAsync(transport, null);
public Task StartAsync() => StartAsync(TransportType.All, httpClient: null);
public Task StartAsync(HttpClient httpClient) => StartAsync(TransportType.All, httpClient: httpClient);
public Task StartAsync(TransportType transportType) => StartAsync(transportType, httpClient: null);
public async Task StartAsync(ITransport transport, HttpClient httpClient)
public async Task StartAsync(TransportType transportType, HttpClient httpClient)
{
await _connection.StartAsync(transport, httpClient);
if (httpClient == null)
{
// We are creating the client so store it to be able to dispose
_httpClient = httpClient = new HttpClient();
}
await _connection.StartAsync(new DefaultTransportFactory(transportType, _loggerFactory, httpClient), httpClient);
}
public async Task StartAsync(ITransportFactory transportFactory, HttpClient httpClient)
{
await _connection.StartAsync(transportFactory, httpClient);
}
public async Task DisposeAsync()
{
await _connection.DisposeAsync();
_httpClient?.Dispose();
}
// TODO: Client return values/tasks?

View File

@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private volatile int _connectionState = ConnectionState.Initial;
private volatile IChannelConnection<Message, SendMessage> _transportChannel;
private HttpClient _httpClient;
private volatile ITransport _transport;
private volatile Task _receiveLoopTask;
private TaskCompletionSource<object> _startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -46,11 +47,22 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger = _loggerFactory.CreateLogger<Connection>();
}
public Task StartAsync() => StartAsync(transport: null, httpClient: null);
public Task StartAsync(HttpClient httpClient) => StartAsync(transport: null, httpClient: httpClient);
public Task StartAsync(ITransport transport) => StartAsync(transport: transport, httpClient: null);
public Task StartAsync() => StartAsync(transportFactory: null, httpClient: null);
public Task StartAsync(HttpClient httpClient) => StartAsync(transportFactory: null, httpClient: httpClient);
public Task StartAsync(ITransportFactory transportFactory) => StartAsync(transportFactory, httpClient: null);
public Task StartAsync(TransportType transportType) => StartAsync(transportType, httpClient: null);
public Task StartAsync(TransportType transportType, HttpClient httpClient)
{
if (httpClient == null)
{
// We are creating the client so store it to be able to dispose
_httpClient = httpClient = new HttpClient();
}
public Task StartAsync(ITransport transport, HttpClient httpClient)
return StartAsync(new DefaultTransportFactory(transportType, _loggerFactory, httpClient), httpClient);
}
public Task StartAsync(ITransportFactory transportFactory, HttpClient httpClient)
{
if (Interlocked.CompareExchange(ref _connectionState, ConnectionState.Connecting, ConnectionState.Initial)
!= ConnectionState.Initial)
@ -59,7 +71,13 @@ namespace Microsoft.AspNetCore.Sockets.Client
new InvalidOperationException("Cannot start a connection that is not in the Initial state."));
}
StartAsyncInternal(transport, httpClient)
if (httpClient == null)
{
// We are creating the client so store it to be able to dispose
_httpClient = httpClient = new HttpClient();
}
StartAsyncInternal(transportFactory ?? new DefaultTransportFactory(TransportType.All, _loggerFactory, httpClient), httpClient)
.ContinueWith(t =>
{
if (t.IsFaulted)
@ -79,7 +97,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
return _startTcs.Task;
}
private async Task StartAsyncInternal(ITransport transport, HttpClient httpClient)
private async Task StartAsyncInternal(ITransportFactory transportFactory, HttpClient httpClient)
{
_logger.LogDebug("Starting connection.");
@ -94,9 +112,10 @@ namespace Microsoft.AspNetCore.Sockets.Client
return;
}
_transport = transport ?? new WebSocketsTransport(_loggerFactory);
// TODO: Available server transports should be sent by the server in the negotiation response
_transport = transportFactory.CreateTransport(TransportType.All);
_logger.LogDebug("Starting transport '{0}' with Url: {1}", transport.GetType().Name, connectUrl);
_logger.LogDebug("Starting transport '{0}' with Url: {1}", _transport.GetType().Name, connectUrl);
await StartTransport(connectUrl);
}
catch
@ -140,6 +159,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger.LogDebug("Draining event queue");
await _eventQueue.Drain();
_httpClient?.Dispose();
_logger.LogDebug("Raising Closed event");
// Do not "simplify" - event handlers can be removed from a different thread
@ -160,20 +181,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
private static async Task<Uri> GetConnectUrl(Uri url, HttpClient httpClient, ILogger logger)
{
var disposeHttpClient = httpClient == null;
httpClient = httpClient ?? new HttpClient();
try
{
var connectionId = await GetConnectionId(url, httpClient, logger);
return Utils.AppendQueryString(url, "id=" + connectionId);
}
finally
{
if (disposeHttpClient)
{
httpClient.Dispose();
}
}
var connectionId = await GetConnectionId(url, httpClient, logger);
return Utils.AppendQueryString(url, "id=" + connectionId);
}
private static async Task<string> GetConnectionId(Uri url, HttpClient httpClient, ILogger logger)
@ -335,6 +344,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
await _receiveLoopTask;
}
_httpClient?.Dispose();
}
private class ConnectionState

View File

@ -0,0 +1,53 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.Http;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Client
{
public class DefaultTransportFactory : ITransportFactory
{
private readonly HttpClient _httpClient;
private readonly TransportType _requestedTransportType;
private readonly ILoggerFactory _loggerFactory;
public DefaultTransportFactory(TransportType requestedTransportType, ILoggerFactory loggerFactory, HttpClient httpClient)
{
if (requestedTransportType <= 0 || requestedTransportType > TransportType.All)
{
throw new ArgumentOutOfRangeException(nameof(requestedTransportType));
}
if (httpClient == null)
{
throw new ArgumentNullException(nameof(httpClient));
}
_requestedTransportType = requestedTransportType;
_loggerFactory = loggerFactory;
_httpClient = httpClient;
}
public ITransport CreateTransport(TransportType availableServerTransports)
{
if ((availableServerTransports & TransportType.WebSockets & _requestedTransportType) == TransportType.WebSockets)
{
return new WebSocketsTransport(_loggerFactory);
}
if ((availableServerTransports & TransportType.ServerSentEvents & _requestedTransportType) == TransportType.ServerSentEvents)
{
throw new NotImplementedException();
}
if ((availableServerTransports & TransportType.LongPolling & _requestedTransportType) == TransportType.LongPolling)
{
return new LongPollingTransport(_httpClient, _loggerFactory);
}
throw new InvalidOperationException("No requested transports available on the server.");
}
}
}

View File

@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
public interface IConnection
{
Task StartAsync(ITransport transport, HttpClient httpClient);
Task StartAsync(ITransportFactory transportFactory, HttpClient httpClient);
Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken);
Task DisposeAsync();

View File

@ -0,0 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net.Http;
namespace Microsoft.AspNetCore.Sockets.Client
{
public interface ITransportFactory
{
ITransport CreateTransport(TransportType availableServerTransports);
}
}

View File

@ -1,17 +1,16 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
using Xunit;
using System.Diagnostics;
namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
@ -51,11 +50,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri("http://test/hubs"));
try
{
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("HelloWorld");
@ -76,11 +74,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri("http://test/hubs"));
try
{
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("Echo", originalMessage);
@ -101,11 +98,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("echo", originalMessage);
@ -126,11 +122,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri("http://test/hubs"));
try
{
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var tcs = new TaskCompletionSource<string>();
connection.On("Echo", new[] { typeof(string) }, a =>
@ -156,11 +151,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
var connection = new HubConnection(new Uri("http://test/hubs"));
try
{
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var ex = await Assert.ThrowsAnyAsync<Exception>(
async () => await connection.Invoke<object>("!@#$%"));

View File

@ -36,6 +36,16 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
Assert.Equal(connectionUrl, new Connection(connectionUrl).Url);
}
[Theory]
[InlineData(0)]
[InlineData(TransportType.All + 1)]
public async Task CannotStartConnectionWithInvalidTransportType(TransportType requestedTransportType)
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => connection.StartAsync(requestedTransportType));
}
[Fact]
public async Task CannotStartRunningConnection()
{
@ -50,14 +60,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
finally
@ -81,14 +90,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
@ -99,12 +107,11 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
using (var httpClient = new HttpClient())
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
@ -138,7 +145,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
transport.Setup(t => t.StopAsync()).Returns(async () => { await releaseDisposeTcs.Task; });
var connection = new Connection(new Uri("http://fakeuri.org/"));
var startTask = connection.StartAsync(transport.Object, httpClient);
var startTask = connection.StartAsync(new TestTransportFactory(transport.Object), httpClient);
await allowDisposeTcs.Task;
var disposeTask = connection.DisposeAsync();
// allow StartAsync to continue once DisposeAsync has started
@ -176,10 +183,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
@ -202,14 +208,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var connectedEventRaisedTcs = new TaskCompletionSource<object>();
connection.Connected += () => connectedEventRaisedTcs.SetResult(null);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connectedEventRaisedTcs.Task.OrTimeout();
}
@ -246,7 +251,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
connection.Connected += () => connectedEventRaised = true;
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(mockTransport.Object, httpClient));
async () => await connection.StartAsync(new TestTransportFactory(mockTransport.Object), httpClient));
}
finally
{
@ -271,13 +276,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.SetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connection.DisposeAsync();
// in case of clean disconnect error should be null
@ -303,14 +307,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.TrySetResult(e);
try
{
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
Assert.IsType<HttpRequestException>(await closedEventTcs.Task.OrTimeout());
}
finally
@ -356,7 +359,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
var receivedInvoked = false;
connection.Received += (m, t) => receivedInvoked = true;
await connection.StartAsync(mockTransport.Object, httpClient);
await connection.StartAsync(new TestTransportFactory(mockTransport.Object), httpClient);
await connection.DisposeAsync();
Assert.False(receivedInvoked);
}
@ -407,7 +410,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
};
connection.Closed += e => closedTcs.SetResult(null);
await connection.StartAsync(mockTransport.Object, httpClient);
await connection.StartAsync(new TestTransportFactory(mockTransport.Object), httpClient);
channel.Output.TryWrite(new Message());
channel.Output.TryWrite(new Message());
await allowDisposeTcs.Task.OrTimeout();
@ -449,7 +452,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
try
{
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(new TestTransportFactory(longPollingTransport), httpClient);
Assert.False(longPollingTransport.Running.IsCompleted);
}
@ -485,11 +488,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connection.SendAsync(data, MessageType.Binary);
@ -532,10 +534,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
await connection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
@ -563,10 +564,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
var exception = await Assert.ThrowsAsync<HttpRequestException>(
async () => await connection.SendAsync(new byte[0], MessageType.Binary));
@ -595,8 +595,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
@ -614,7 +612,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
};
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
}
@ -643,14 +641,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(TransportType.LongPolling, httpClient);
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();

View File

@ -2,7 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
@ -10,11 +11,11 @@ using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Client.Tests;
using Microsoft.Extensions.Logging;
using Moq;
using Moq.Protected;
using Xunit;
using System.IO;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
@ -44,20 +45,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
return ResponseUtils.CreateResponse(System.Net.HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
try
{
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StartAsync(TransportType.LongPolling, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
async () => await hubConnection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
finally
@ -76,19 +76,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
return ResponseUtils.CreateResponse(System.Net.HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StartAsync(TransportType.LongPolling, httpClient);
await hubConnection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
async () => await hubConnection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
@ -126,7 +125,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns(Task.FromException(exception));
var hubConnection = new HubConnection(mockConnection.Object, mockInvocationAdapter.Object, null);
await hubConnection.StartAsync(Mock.Of<ITransport>());
await hubConnection.StartAsync(TransportType.All, httpClient: null);
var actualException =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.Invoke<int>("test"));
@ -142,19 +141,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
return ResponseUtils.CreateResponse(System.Net.HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
try
{
var connectedEventRaisedTcs = new TaskCompletionSource<object>();
hubConnection.Connected += () => connectedEventRaisedTcs.SetResult(null);
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StartAsync(TransportType.LongPolling, httpClient);
await connectedEventRaisedTcs.Task.OrTimeout();
}
@ -174,17 +172,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
return ResponseUtils.CreateResponse(System.Net.HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closedEventTcs.SetResult(e);
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StartAsync(TransportType.LongPolling, httpClient);
await hubConnection.DisposeAsync();
Assert.Null(await closedEventTcs.Task.OrTimeout());
@ -202,7 +199,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = new HubConnection(mockConnection.Object, Mock.Of<IInvocationAdapter>(), new LoggerFactory());
await hubConnection.StartAsync(Mock.Of<ITransport>());
await hubConnection.StartAsync(new TestTransportFactory(Mock.Of<ITransport>()), httpClient: null);
await hubConnection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.Invoke("test", typeof(int)));
@ -221,7 +218,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = new HubConnection(mockConnection.Object, Mock.Of<IInvocationAdapter>(), new LoggerFactory());
await hubConnection.StartAsync(Mock.Of<ITransport>());
await hubConnection.StartAsync(new TestTransportFactory(Mock.Of<ITransport>()), httpClient: null);
var invokeTask = hubConnection.Invoke("testMethod", typeof(int));
await hubConnection.DisposeAsync();
@ -240,7 +237,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = new HubConnection(mockConnection.Object, Mock.Of<IInvocationAdapter>(), new LoggerFactory());
await hubConnection.StartAsync(Mock.Of<ITransport>());
await hubConnection.StartAsync(new TestTransportFactory(Mock.Of<ITransport>()), httpClient: null);
var invokeTask = hubConnection.Invoke("testMethod", typeof(int));
await hubConnection.DisposeAsync();
@ -266,7 +263,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns(Task.FromResult((InvocationMessage)invocationDescriptor));
var hubConnection = new HubConnection(mockConnection.Object, mockInvocationAdapter.Object, null);
await hubConnection.StartAsync(Mock.Of<ITransport>());
await hubConnection.StartAsync(new TestTransportFactory(Mock.Of<ITransport>()), httpClient: null);
mockConnection.Raise(c => c.Received += null, new object[] { new byte[] { }, MessageType.Text });
mockInvocationAdapter.Verify(a => a.ReadMessageAsync(It.IsAny<Stream>(), It.IsAny<IInvocationBinder>(), It.IsAny<CancellationToken>()), Times.Once());

View File

@ -0,0 +1,23 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net.Http;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
public class TestTransportFactory : ITransportFactory
{
private readonly ITransport _transport;
public TestTransportFactory(ITransport transport)
{
_transport = transport;
}
public ITransport CreateTransport(TransportType availableServerTransports)
{
return _transport;
}
}
}

View File

@ -0,0 +1,66 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.Http;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Tests
{
public class DefaultTransportFactoryTests
{
[Theory]
[InlineData(0)]
[InlineData(TransportType.All + 1)]
public void DefaultTransportFactoryCannotBeCreatedWithInvalidTransportType(TransportType transportType)
{
Assert.Throws<ArgumentOutOfRangeException>(
() => new DefaultTransportFactory(transportType, new LoggerFactory(), new HttpClient()));
}
[Fact]
public void DefaultTransportFactoryCannotBeCreatedWithoutHttpClient()
{
var exception = Assert.Throws<ArgumentNullException>(
() => new DefaultTransportFactory(TransportType.All, new LoggerFactory(), httpClient: null));
Assert.Equal(exception.ParamName, "httpClient");
}
[Theory]
[InlineData(TransportType.WebSockets, typeof(WebSocketsTransport))]
[InlineData(TransportType.LongPolling, typeof(LongPollingTransport))]
public void DefaultTransportFactoryCreatesRequestedTransportIfAvailable(TransportType requestedTransport, Type expectedTransportType)
{
var transportFactory = new DefaultTransportFactory(requestedTransport, loggerFactory: null, httpClient: new HttpClient());
Assert.IsType(expectedTransportType,
transportFactory.CreateTransport(TransportType.All));
}
[Theory]
[InlineData(TransportType.WebSockets)]
[InlineData(TransportType.ServerSentEvents)]
[InlineData(TransportType.LongPolling)]
[InlineData(TransportType.All)]
public void DefaultTransportFactoryThrowsIfItCannotCreateRequestedTransport(TransportType requestedTransport)
{
var transportFactory =
new DefaultTransportFactory(requestedTransport, loggerFactory: null, httpClient: new HttpClient());
var ex = Assert.Throws<InvalidOperationException>(
() => transportFactory.CreateTransport(~requestedTransport));
Assert.Equal("No requested transports available on the server.", ex.Message);
}
[Fact]
public void DefaultTransportFactoryCreatesWebSocketsTransportIfAvailable()
{
Assert.IsType<WebSocketsTransport>(
new DefaultTransportFactory(TransportType.All, loggerFactory: null, httpClient: new HttpClient())
.CreateTransport(TransportType.All));
}
}
}

View File

@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
@ -63,58 +62,44 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public static IEnumerable<object[]> Transports
{
get
{
yield return new object[] { new Func<ILoggerFactory, ITransport>(loggerFactory => new WebSocketsTransport(loggerFactory)) };
yield return new object[] { new Func<ILoggerFactory, ITransport>(loggerFactory => new LongPollingTransport(new HttpClient(), loggerFactory)) };
}
}
[ConditionalTheory]
[OSSkipCondition(OperatingSystems.Windows, WindowsVersions.Win7, WindowsVersions.Win2008R2, SkipReason = "No WebSockets Client for this platform")]
[MemberData(nameof(Transports))]
public async Task ConnectionCanSendAndReceiveMessages(Func<ILoggerFactory, ITransport> transportFactory)
[MemberData(nameof(TransportTypes))]
public async Task ConnectionCanSendAndReceiveMessages(TransportType transportType)
{
const string message = "Major Key";
var baseUrl = _serverFixture.BaseUrl;
var loggerFactory = new LoggerFactory();
loggerFactory.AddXunit(_output, LogLevel.Trace);
var transport = transportFactory(loggerFactory);
using (var httpClient = new HttpClient())
var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory);
try
{
var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory);
try
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
{
if (e != null)
{
if (e != null)
{
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetResult(null);
}
};
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetResult(null);
}
};
await connection.StartAsync(transport, httpClient);
await connection.StartAsync(transportType);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
var receiveData = new ReceiveData();
var receiveData = new ReceiveData();
Assert.Equal(message, await receiveTcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
Assert.Equal(message, await receiveTcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
}
@ -139,11 +124,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory);
try
{
var transport = new WebSocketsTransport();
var receiveTcs = new TaskCompletionSource<byte[]>();
connection.Received += (data, messageType) => receiveTcs.SetResult(data);
await connection.StartAsync(transport);
await connection.StartAsync(TransportType.WebSockets);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
@ -157,5 +141,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await connection.DisposeAsync();
}
}
public static IEnumerable<object[]> TransportTypes() =>
new[]
{
new object[] { TransportType.WebSockets },
new object[] { TransportType.LongPolling }
};
}
}