diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index 82e6c9cc89..d02cc0bfb6 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -16,14 +16,15 @@ namespace Microsoft.AspNetCore.Sockets.Client { private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; + private volatile int _connectionState = ConnectionState.Initial; - private volatile IChannelConnection _transportChannel; + private volatile IChannelConnection _transportChannel; private volatile ITransport _transport; private volatile Task _receiveLoopTask; private volatile Task _startTask = Task.CompletedTask; private ReadableChannel Input => _transportChannel.Input; - private WritableChannel Output => _transportChannel.Output; + private WritableChannel Output => _transportChannel.Output; public Uri Url { get; } @@ -146,11 +147,11 @@ namespace Microsoft.AspNetCore.Sockets.Client private async Task StartTransport(Uri connectUrl) { - var applicationToTransport = Channel.CreateUnbounded(); + var applicationToTransport = Channel.CreateUnbounded(); var transportToApplication = Channel.CreateUnbounded(); - var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport); + var applicationSide = new ChannelConnection(applicationToTransport, transportToApplication); - _transportChannel = new ChannelConnection(applicationToTransport, transportToApplication); + _transportChannel = new ChannelConnection(transportToApplication, applicationToTransport); // Start the transport, giving it one end of the pipeline try @@ -194,12 +195,12 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger.LogTrace("Ending receive loop"); } - public Task SendAsync(byte[] data, MessageType type) + public Task SendAsync(byte[] data, MessageType type) { return SendAsync(data, type, CancellationToken.None); } - public async Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken) + public async Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken) { if (data == null) { @@ -208,20 +209,25 @@ namespace Microsoft.AspNetCore.Sockets.Client if (_connectionState != ConnectionState.Connected) { - return false; + throw new InvalidOperationException( + "Cannot send messages when the connection is not in the Connected state."); } - var message = new Message(data, type); + // TaskCreationOptions.RunContinuationsAsynchronously ensures that continuations awaiting + // SendAsync (i.e. user's code) are not running on the same thread as the code that sets + // TaskCompletionSource result. This way we prevent from user's code blocking our channel + // send loop. + var sendTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var message = new SendMessage(data, type, sendTcs); while (await Output.WaitToWriteAsync(cancellationToken)) { if (Output.TryWrite(message)) { - return true; + await sendTcs.Task; + break; } } - - return false; } public async Task DisposeAsync() diff --git a/src/Microsoft.AspNetCore.Sockets.Client/IConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client/IConnection.cs index 524e3de548..977c9cee65 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/IConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/IConnection.cs @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Sockets.Client public interface IConnection { Task StartAsync(ITransport transport, HttpClient httpClient); - Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken); + Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken); Task DisposeAsync(); event Action Connected; diff --git a/src/Microsoft.AspNetCore.Sockets.Client/ITransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/ITransport.cs index 1790b7457b..40bc5b7a1a 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/ITransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/ITransport.cs @@ -8,7 +8,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { public interface ITransport { - Task StartAsync(Uri url, IChannelConnection application); + Task StartAsync(Uri url, IChannelConnection application); Task StopAsync(); } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs index a0cfceab64..bc8ae517f1 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private readonly HttpClient _httpClient; private readonly ILogger _logger; - private IChannelConnection _application; + private IChannelConnection _application; private Task _sender; private Task _poller; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public Task StartAsync(Uri url, IChannelConnection application) + public Task StartAsync(Uri url, IChannelConnection application) { _application = application; @@ -145,12 +145,14 @@ namespace Microsoft.AspNetCore.Sockets.Client private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken) { + TaskCompletionSource sendTcs = null; try { while (await _application.Input.WaitToReadAsync(cancellationToken)) { - while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out Message message)) + while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out SendMessage message)) { + sendTcs = message.SendResult; var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); request.Headers.UserAgent.Add(DefaultUserAgentHeader); @@ -161,16 +163,19 @@ namespace Microsoft.AspNetCore.Sockets.Client var response = await _httpClient.SendAsync(request); response.EnsureSuccessStatusCode(); + sendTcs.SetResult(null); } } } catch (OperationCanceledException) { // transport is being closed + sendTcs?.TrySetCanceled(); } catch (Exception ex) { _logger.LogError("Error while sending to '{0}': {1}", sendUrl, ex); + sendTcs?.TrySetException(ex); throw; } finally diff --git a/src/Microsoft.AspNetCore.Sockets.Client/SendMessage.cs b/src/Microsoft.AspNetCore.Sockets.Client/SendMessage.cs new file mode 100644 index 0000000000..c248c0bf05 --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets.Client/SendMessage.cs @@ -0,0 +1,21 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Sockets.Client +{ + public struct SendMessage + { + public MessageType Type { get; } + public byte[] Payload { get; } + public TaskCompletionSource SendResult { get; } + + public SendMessage(byte[] payload, MessageType type, TaskCompletionSource result) + { + Type = type; + Payload = payload; + SendResult = result; + } + } +} diff --git a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs index 17253d3f8e..7d7a537acd 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs @@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.Sockets.Client public class WebSocketsTransport : ITransport { private ClientWebSocket _webSocket = new ClientWebSocket(); - private IChannelConnection _application; + private IChannelConnection _application; private CancellationToken _cancellationToken = new CancellationToken(); private readonly ILogger _logger; @@ -26,12 +26,12 @@ namespace Microsoft.AspNetCore.Sockets.Client public WebSocketsTransport(ILoggerFactory loggerFactory) { - _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger("WebSocketsTransport"); + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(nameof(WebSocketsTransport)); } public Task Running { get; private set; } = Task.CompletedTask; - public async Task StartAsync(Uri url, IChannelConnection application) + public async Task StartAsync(Uri url, IChannelConnection application) { if (url == null) { @@ -121,21 +121,29 @@ namespace Microsoft.AspNetCore.Sockets.Client { while (await _application.Input.WaitToReadAsync(cancellationToken)) { - Message message; - while (_application.Input.TryRead(out message)) + while (_application.Input.TryRead(out SendMessage message)) { try { await _webSocket.SendAsync(new ArraySegment(message.Payload), message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true, cancellationToken); + message.SendResult.SetResult(null); } catch (OperationCanceledException ex) { _logger?.LogError(ex.Message); + message.SendResult.SetCanceled(); await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); break; } + catch (Exception ex) + { + _logger?.LogError(ex.Message); + message.SendResult.SetException(ex); + await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); + throw; + } } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Common/IChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Common/IChannelConnection.cs index 3e918d5898..2fb0ae7255 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/IChannelConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/IChannelConnection.cs @@ -8,9 +8,13 @@ namespace Microsoft.AspNetCore.Sockets { // REVIEW: These should probably move to Channels. Why not use IChannel? Because I think it's better to be clear that this is providing // access to two separate channels, the read end for one and the write end for the other. - public interface IChannelConnection : IDisposable + public interface IChannelConnection : IChannelConnection { - ReadableChannel Input { get; } - WritableChannel Output { get; } + } + + public interface IChannelConnection : IDisposable + { + ReadableChannel Input { get; } + WritableChannel Output { get; } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Common/Internal/ChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Common/Internal/ChannelConnection.cs index 892cb7deba..6f70be4504 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/Internal/ChannelConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/Internal/ChannelConnection.cs @@ -7,21 +7,33 @@ namespace Microsoft.AspNetCore.Sockets.Internal { public static class ChannelConnection { + public static ChannelConnection Create(Channel input, Channel output) + { + return new ChannelConnection(input, output); + } + public static ChannelConnection Create(Channel input, Channel output) { return new ChannelConnection(input, output); } } - public class ChannelConnection : IChannelConnection + public class ChannelConnection : ChannelConnection, IChannelConnection { - public Channel Input { get; } - public Channel Output { get; } - - ReadableChannel IChannelConnection.Input => Input; - WritableChannel IChannelConnection.Output => Output; - public ChannelConnection(Channel input, Channel output) + : base(input, output) + { } + } + + public class ChannelConnection : IChannelConnection + { + public Channel Input { get; } + public Channel Output { get; } + + ReadableChannel IChannelConnection.Input => Input; + WritableChannel IChannelConnection.Output => Output; + + public ChannelConnection(Channel input, Channel output) { Input = input; Output = output; diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs index cde22d8340..40490c7f4f 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs @@ -30,8 +30,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - Application = ChannelConnection.Create(input: applicationToTransport, output: transportToApplication); - var transport = ChannelConnection.Create(input: transportToApplication, output: applicationToTransport); + Application = ChannelConnection.Create(input: applicationToTransport, output: transportToApplication); + var transport = ChannelConnection.Create(input: transportToApplication, output: applicationToTransport); Connection = new Connection(Guid.NewGuid().ToString(), transport); Connection.Metadata["formatType"] = format; diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs index 2f7d675e95..b94257359b 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs @@ -144,19 +144,21 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests releaseDisposeTcs.SetResult(null); await disposeTask.OrTimeout(); - transport.Verify(t => t.StartAsync(It.IsAny(), It.IsAny>()), Times.Never); + transport.Verify(t => t.StartAsync(It.IsAny(), It.IsAny>()), Times.Never); } } [Fact] - public async Task SendReturnsFalseIfConnectionIsNotStarted() + public async Task SendThrowsIfConnectionIsNotStarted() { var connection = new Connection(new Uri("http://fakeuri.org/")); - Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary)); + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } [Fact] - public async Task SendReturnsFalseIfConnectionIsDisposed() + public async Task SendThrowsIfConnectionIsDisposed() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() @@ -175,7 +177,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests await connection.StartAsync(longPollingTransport, httpClient); await connection.DisposeAsync(); - Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary)); + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } } @@ -224,7 +228,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests }); var mockTransport = new Mock(); - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) .Returns(Task.FromException(new InvalidOperationException("Transport failed to start"))); using (var httpClient = new HttpClient(mockHttpHandler.Object)) @@ -249,7 +253,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } [Fact] - public async Task ClosedEventRaisedWhenTheClientIsStopped() + public async Task ClosedEventRaisedWhenTheClientIsBeingStopped() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() @@ -393,6 +397,79 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } } + [Fact] + public async Task SendAsyncThrowsIfConnectionIsNotStarted() + { + var connection = new Connection(new Uri("http://fakeuri.org/")); + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); + + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); + } + + [Fact] + public async Task SendAsyncThrowsIfConnectionIsDisposed() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + + var content = string.Empty; + if (request.RequestUri.AbsolutePath.EndsWith("/poll")) + { + content = "T2:T:42;"; + } + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(content) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + { + var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); + var connection = new Connection(new Uri("http://fakeuri.org/")); + + await connection.StartAsync(longPollingTransport, httpClient); + await connection.DisposeAsync(); + + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); + + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); + } + } + + [Fact] + public async Task CallerReceivesExceptionsFromSendAsync() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + if (request.RequestUri.AbsolutePath.EndsWith("/send")) + { + return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; + } + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + { + var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); + var connection = new Connection(new Uri("http://fakeuri.org/")); + + await connection.StartAsync(longPollingTransport, httpClient); + + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); + + await connection.DisposeAsync(); + } + } + [Fact] public async Task CanReceiveData() { @@ -443,29 +520,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } } - [Fact] - public async Task CannotSendAfterConnectionIsStopped() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; - }); - - using (var httpClient = new HttpClient(mockHttpHandler.Object)) - { - var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); - var connection = new Connection(new Uri("http://fakeuri.org/")); - - await connection.StartAsync(longPollingTransport, httpClient); - await connection.DisposeAsync(); - Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary)); - } - } - [Fact] public async Task CannotSendAfterReceiveThrewException() { @@ -496,46 +550,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests // Exception in send should shutdown the connection await closeTcs.Task.OrTimeout(); - Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary)); - } - finally - { - await connection.DisposeAsync(); - } - } - } + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0], MessageType.Binary)); - [Fact] - public async Task CannotReceiveAfterReceiveThrewException() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - if (request.RequestUri.AbsolutePath.EndsWith("/poll")) - { - return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; - } - return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; - }); - - using (var httpClient = new HttpClient(mockHttpHandler.Object)) - { - var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); - var connection = new Connection(new Uri("http://fakeuri.org/")); - try - { - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.TrySetResult(e); - - await connection.StartAsync(longPollingTransport, httpClient); - - // Exception in send should shutdown the connection - await closeTcs.Task.OrTimeout(); - - Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary)); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } finally { diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs index bdc2f0915f..97dd13b463 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs @@ -38,9 +38,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests try { - var connectionToTransport = Channel.CreateUnbounded(); + var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); transportActiveTask = longPollingTransport.Running; @@ -74,9 +74,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); try { - var connectionToTransport = Channel.CreateUnbounded(); + var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); await longPollingTransport.Running.OrTimeout(); @@ -106,9 +106,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); try { - var connectionToTransport = Channel.CreateUnbounded(); + var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); var exception = @@ -142,12 +142,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); try { - var connectionToTransport = Channel.CreateUnbounded(); + var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); - await connectionToTransport.Out.WriteAsync(new Message()); + await connectionToTransport.Out.WriteAsync(new SendMessage()); await Assert.ThrowsAsync(async () => await longPollingTransport.Running.OrTimeout()); @@ -183,9 +183,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()); try { - var connectionToTransport = Channel.CreateUnbounded(); + var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); connectionToTransport.Out.Complete();