From a84ba8820fe7b7bce6bf0c49950c463016c06012 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 23 Jun 2017 09:52:35 -0700 Subject: [PATCH] Use Channel as the abstraction (#579) * Use Channel as the abstraction --- .../EchoEndPoint.cs | 2 +- .../PersistentConnectionLifeTimeManager.cs | 3 +- .../SocialWeather/SocialWeatherEndPoint.cs | 4 +- .../EndPoints/MessagesEndPoint.cs | 6 +-- .../RedisHubLifetimeManager.cs | 4 +- .../DefaultHubLifetimeManager.cs | 4 +- .../HubEndPoint.cs | 8 ++-- .../ChannelConnection.cs | 31 ++++++++++---- .../ConnectionContext.cs | 6 +-- .../DefaultConnectionContext.cs | 22 ++++++---- .../IChannelConnection.cs | 20 --------- .../HttpConnection.cs | 11 +++-- .../ITransport.cs | 3 +- .../LongPollingTransport.cs | 11 ++--- .../SendUtils.cs | 7 ++-- .../ServerSentEventsTransport.cs | 13 +++--- .../WebSocketsTransport.cs | 19 +++++---- .../HttpConnectionDispatcher.cs | 10 ++--- .../Transports/WebSocketsTransport.cs | 15 +++---- .../ConnectionManager.cs | 4 +- .../ConnectionTests.cs | 27 ++++++------ .../LongPollingTransportTests.cs | 2 +- .../EchoEndPoint.cs | 2 +- .../TestClient.cs | 15 +++---- .../ConnectionManagerTests.cs | 4 +- .../HttpConnectionDispatcherTests.cs | 18 ++++---- .../WebSocketsTests.cs | 42 +++++++++---------- 27 files changed, 160 insertions(+), 153 deletions(-) delete mode 100644 src/Microsoft.AspNetCore.Sockets.Abstractions/IChannelConnection.cs diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs index 835bbbd2f0..e08b972bd1 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs +++ b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server { public async override Task OnConnectedAsync(ConnectionContext connection) { - await connection.Transport.Output.WriteAsync(await connection.Transport.Input.ReadAsync()); + await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync()); } } } diff --git a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs index 9a5d7da288..75bed615b3 100644 --- a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs +++ b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using System.IO; using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Sockets; namespace SocialWeather @@ -40,7 +39,7 @@ namespace SocialWeather var ms = new MemoryStream(); await formatter.WriteAsync(data, ms); - connection.Transport.Output.TryWrite(ms.ToArray()); + connection.Transport.Out.TryWrite(ms.ToArray()); } } diff --git a/samples/SocialWeather/SocialWeatherEndPoint.cs b/samples/SocialWeather/SocialWeatherEndPoint.cs index 2e8e4fa7da..9aa3fb8e58 100644 --- a/samples/SocialWeather/SocialWeatherEndPoint.cs +++ b/samples/SocialWeather/SocialWeatherEndPoint.cs @@ -34,9 +34,9 @@ namespace SocialWeather var formatter = _formatterResolver.GetFormatter( connection.Metadata.Get("formatType")); - while (await connection.Transport.Input.WaitToReadAsync()) + while (await connection.Transport.In.WaitToReadAsync()) { - if (connection.Transport.Input.TryRead(out var buffer)) + if (connection.Transport.In.TryRead(out var buffer)) { var stream = new MemoryStream(); await stream.WriteAsync(buffer, 0, buffer.Length); diff --git a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs index eee54395e2..5559e56518 100644 --- a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs +++ b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs @@ -20,9 +20,9 @@ namespace SocketsSample.EndPoints try { - while (await connection.Transport.Input.WaitToReadAsync()) + while (await connection.Transport.In.WaitToReadAsync()) { - if (connection.Transport.Input.TryRead(out var buffer)) + if (connection.Transport.In.TryRead(out var buffer)) { // We can avoid the copy here but we'll deal with that later var text = Encoding.UTF8.GetString(buffer); @@ -50,7 +50,7 @@ namespace SocketsSample.EndPoints foreach (var c in Connections) { - tasks.Add(c.Transport.Output.WriteAsync(payload)); + tasks.Add(c.Transport.Out.WriteAsync(payload)); } return Task.WhenAll(tasks); diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 13308c2224..a276fde7e7 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -312,9 +312,9 @@ namespace Microsoft.AspNetCore.SignalR.Redis var protocol = connection.Metadata.Get(HubConnectionMetadataNames.HubProtocol); var data = protocol.WriteToArray(hubMessage); - while (await connection.Transport.Output.WaitToWriteAsync()) + while (await connection.Transport.Out.WaitToWriteAsync()) { - if (connection.Transport.Output.TryWrite(data)) + if (connection.Transport.Out.TryWrite(data)) { break; } diff --git a/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs index 4d1ee56803..95958f960c 100644 --- a/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs @@ -124,9 +124,9 @@ namespace Microsoft.AspNetCore.SignalR var protocol = connection.Metadata.Get(HubConnectionMetadataNames.HubProtocol); var payload = protocol.WriteToArray(hubMessage); - while (await connection.Transport.Output.WaitToWriteAsync()) + while (await connection.Transport.Out.WaitToWriteAsync()) { - if (connection.Transport.Output.TryWrite(payload)) + if (connection.Transport.Out.TryWrite(payload)) { break; } diff --git a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs index cd04d3acd7..696dc7408f 100644 --- a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs +++ b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs @@ -154,9 +154,9 @@ namespace Microsoft.AspNetCore.SignalR try { - while (await connection.Transport.Input.WaitToReadAsync(cts.Token)) + while (await connection.Transport.In.WaitToReadAsync(cts.Token)) { - while (connection.Transport.Input.TryRead(out var buffer)) + while (connection.Transport.In.TryRead(out var buffer)) { if (protocol.TryParseMessages(buffer, this, out var hubMessages)) { @@ -232,9 +232,9 @@ namespace Microsoft.AspNetCore.SignalR { var payload = protocol.WriteToArray(hubMessage); - while (await connection.Transport.Output.WaitToWriteAsync()) + while (await connection.Transport.Out.WaitToWriteAsync()) { - if (connection.Transport.Output.TryWrite(payload)) + if (connection.Transport.Out.TryWrite(payload)) { return; } diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs index 6f70be4504..101d988e5e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.Threading.Tasks.Channels; namespace Microsoft.AspNetCore.Sockets.Internal @@ -18,20 +19,36 @@ namespace Microsoft.AspNetCore.Sockets.Internal } } - public class ChannelConnection : ChannelConnection, IChannelConnection + public class ChannelConnection : Channel, IDisposable { + public Channel Input { get; } + public Channel Output { get; } + + public override ReadableChannel In => Input; + + public override WritableChannel Out => Output; + public ChannelConnection(Channel input, Channel output) - : base(input, output) - { } + { + Input = input; + Output = output; + } + + public void Dispose() + { + Input.Out.TryComplete(); + Output.Out.TryComplete(); + } } - public class ChannelConnection : IChannelConnection + public class ChannelConnection : Channel, IDisposable { public Channel Input { get; } public Channel Output { get; } - ReadableChannel IChannelConnection.Input => Input; - WritableChannel IChannelConnection.Output => Output; + public override ReadableChannel In => Input; + + public override WritableChannel Out => Output; public ChannelConnection(Channel input, Channel output) { @@ -41,8 +58,8 @@ namespace Microsoft.AspNetCore.Sockets.Internal public void Dispose() { - Output.Out.TryComplete(); Input.Out.TryComplete(); + Output.Out.TryComplete(); } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs index 84f1ea3d16..56e233b8c7 100644 --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs @@ -2,10 +2,8 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; using System.Security.Claims; -using System.Text; -using System.Threading; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.Http.Features; namespace Microsoft.AspNetCore.Sockets @@ -22,6 +20,6 @@ namespace Microsoft.AspNetCore.Sockets public abstract ConnectionMetadata Metadata { get; } // TEMPORARY - public abstract IChannelConnection Transport { get; set; } + public abstract Channel Transport { get; set; } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/DefaultConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/DefaultConnectionContext.cs index 44afc6c1f9..a23b1224ac 100644 --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/DefaultConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/DefaultConnectionContext.cs @@ -5,6 +5,7 @@ using System; using System.Security.Claims; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.Http.Features; namespace Microsoft.AspNetCore.Sockets @@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets // on the same task private TaskCompletionSource _disposeTcs = new TaskCompletionSource(); - public DefaultConnectionContext(string id, IChannelConnection transport, IChannelConnection application) + public DefaultConnectionContext(string id, Channel transport, Channel application) { Transport = transport; Application = application; @@ -43,9 +44,9 @@ namespace Microsoft.AspNetCore.Sockets public override ConnectionMetadata Metadata { get; } = new ConnectionMetadata(); - public IChannelConnection Application { get; } + public Channel Application { get; } - public override IChannelConnection Transport { get; set; } + public override Channel Transport { get; set; } public async Task DisposeAsync() { @@ -66,17 +67,22 @@ namespace Microsoft.AspNetCore.Sockets // If the application task is faulted, propagate the error to the transport if (ApplicationTask?.IsFaulted == true) { - Transport.Output.TryComplete(ApplicationTask.Exception.InnerException); + Transport.Out.TryComplete(ApplicationTask.Exception.InnerException); + } + else + { + Transport.Out.TryComplete(); } // If the transport task is faulted, propagate the error to the application if (TransportTask?.IsFaulted == true) { - Application.Output.TryComplete(TransportTask.Exception.InnerException); + Application.Out.TryComplete(TransportTask.Exception.InnerException); + } + else + { + Application.Out.TryComplete(); } - - Transport.Dispose(); - Application.Dispose(); var applicationTask = ApplicationTask ?? Task.CompletedTask; var transportTask = TransportTask ?? Task.CompletedTask; diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/IChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/IChannelConnection.cs deleted file mode 100644 index 2fb0ae7255..0000000000 --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/IChannelConnection.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading.Tasks.Channels; - -namespace Microsoft.AspNetCore.Sockets -{ - // REVIEW: These should probably move to Channels. Why not use IChannel? Because I think it's better to be clear that this is providing - // access to two separate channels, the read end for one and the write end for the other. - public interface IChannelConnection : IChannelConnection - { - } - - public interface IChannelConnection : IDisposable - { - ReadableChannel Input { get; } - WritableChannel Output { get; } - } -} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs index 0a5c101b9c..1c8d4cde63 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs @@ -21,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private readonly ILogger _logger; private volatile int _connectionState = ConnectionState.Initial; - private volatile IChannelConnection _transportChannel; + private volatile ChannelConnection _transportChannel; private readonly HttpClient _httpClient; private volatile ITransport _transport; private volatile Task _receiveLoopTask; @@ -29,8 +29,8 @@ namespace Microsoft.AspNetCore.Sockets.Client private TaskQueue _eventQueue = new TaskQueue(); private readonly ITransportFactory _transportFactory; - private ReadableChannel Input => _transportChannel.Input; - private WritableChannel Output => _transportChannel.Output; + private ReadableChannel Input => _transportChannel.In; + private WritableChannel Output => _transportChannel.Out; public Uri Url { get; } @@ -250,9 +250,8 @@ namespace Microsoft.AspNetCore.Sockets.Client { var applicationToTransport = Channel.CreateUnbounded(); var transportToApplication = Channel.CreateUnbounded(); - var applicationSide = new ChannelConnection(applicationToTransport, transportToApplication); - - _transportChannel = new ChannelConnection(transportToApplication, applicationToTransport); + var applicationSide = ChannelConnection.Create(applicationToTransport, transportToApplication); + _transportChannel = ChannelConnection.Create(transportToApplication, applicationToTransport); // Start the transport, giving it one end of the pipeline try diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs index 51e2b17159..deb3b2f56e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs @@ -3,12 +3,13 @@ using System; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; namespace Microsoft.AspNetCore.Sockets.Client { public interface ITransport { - Task StartAsync(Uri url, IChannelConnection application); + Task StartAsync(Uri url, Channel application); Task StopAsync(); } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs index f115c67e00..5bb572d5c5 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs @@ -6,6 +6,7 @@ using System.Net; using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { private readonly HttpClient _httpClient; private readonly ILogger _logger; - private IChannelConnection _application; + private Channel _application; private Task _sender; private Task _poller; @@ -33,7 +34,7 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public Task StartAsync(Uri url, IChannelConnection application) + public Task StartAsync(Uri url, Channel application) { _logger.LogInformation("Starting {0}", nameof(LongPollingTransport)); @@ -47,7 +48,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException); - _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); + _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); @@ -100,9 +101,9 @@ namespace Microsoft.AspNetCore.Sockets.Client var payload = await response.Content.ReadAsByteArrayAsync(); if (payload.Length > 0) { - while (!_application.Output.TryWrite(payload)) + while (!_application.Out.TryWrite(payload)) { - if (cancellationToken.IsCancellationRequested || !await _application.Output.WaitToWriteAsync(cancellationToken)) + if (cancellationToken.IsCancellationRequested || !await _application.Out.WaitToWriteAsync(cancellationToken)) { return; } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs index 98aa50d539..e9ba0aedba 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs @@ -8,6 +8,7 @@ using System.Net.Http; using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Sockets.Client @@ -17,17 +18,17 @@ namespace Microsoft.AspNetCore.Sockets.Client private static readonly string DefaultUserAgent = "Microsoft.AspNetCore.SignalR.Client/0.0.0"; public static readonly ProductInfoHeaderValue DefaultUserAgentHeader = ProductInfoHeaderValue.Parse(DefaultUserAgent); - public static async Task SendMessages(Uri sendUrl, IChannelConnection application, HttpClient httpClient, CancellationTokenSource transportCts, ILogger logger) + public static async Task SendMessages(Uri sendUrl, Channel application, HttpClient httpClient, CancellationTokenSource transportCts, ILogger logger) { logger.LogInformation("Starting the send loop"); IList messages = null; try { - while (await application.Input.WaitToReadAsync(transportCts.Token)) + while (await application.In.WaitToReadAsync(transportCts.Token)) { // Grab as many messages as we can from the channel messages = new List(); - while (!transportCts.Token.IsCancellationRequested && application.Input.TryRead(out SendMessage message)) + while (!transportCts.Token.IsCancellationRequested && application.In.TryRead(out SendMessage message)) { messages.Add(message); } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs index 2bc1acbf5a..a1a1fd48ba 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs @@ -7,6 +7,7 @@ using System.Net.Http; using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.Sockets.Internal.Formatters; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -20,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser(); - private IChannelConnection _application; + private Channel _application; public Task Running { get; private set; } = Task.CompletedTask; @@ -39,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public Task StartAsync(Uri url, IChannelConnection application) + public Task StartAsync(Uri url, Channel application) { _logger.LogInformation("Starting {transportName}", nameof(ServerSentEventsTransport)); @@ -54,14 +55,14 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger.LogError(0, t.Exception.InnerException, "Transport stopped"); } - _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); + _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); return Task.CompletedTask; } - private async Task OpenConnection(IChannelConnection application, Uri url, CancellationToken cancellationToken) + private async Task OpenConnection(Channel application, Uri url, CancellationToken cancellationToken) { _logger.LogInformation("Starting receive loop"); @@ -94,7 +95,7 @@ namespace Microsoft.AspNetCore.Sockets.Client switch (parseResult) { case ServerSentEventsMessageParser.ParseResult.Completed: - _application.Output.TryWrite(buffer); + _application.Out.TryWrite(buffer); _parser.Reset(); break; case ServerSentEventsMessageParser.ParseResult.Incomplete: @@ -122,7 +123,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.LogInformation("Transport {transportName} is stopping", nameof(ServerSentEventsTransport)); _transportCts.Cancel(); - _application.Output.TryComplete(); + _application.Out.TryComplete(); await Running; } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs index 8b7030213a..82dac49b6d 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs @@ -2,11 +2,12 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.Diagnostics; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; +using System.Threading.Tasks.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets.Client public class WebSocketsTransport : ITransport { private readonly ClientWebSocket _webSocket = new ClientWebSocket(); - private IChannelConnection _application; + private Channel _application; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); private readonly ILogger _logger; @@ -31,7 +32,7 @@ namespace Microsoft.AspNetCore.Sockets.Client public Task Running { get; private set; } = Task.CompletedTask; - public async Task StartAsync(Uri url, IChannelConnection application) + public async Task StartAsync(Uri url, Channel application) { _logger.LogInformation("Starting {0}", nameof(WebSocketsTransport)); @@ -57,7 +58,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException); - _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); + _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); } @@ -84,7 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.LogInformation("Websocket closed by the server. Close status {0}", receiveResult.CloseStatus); - _application.Output.Complete( + _application.Out.Complete( receiveResult.CloseStatus == WebSocketCloseStatus.NormalClosure ? null : new InvalidOperationException( @@ -119,9 +120,9 @@ namespace Microsoft.AspNetCore.Sockets.Client } _logger.LogInformation("Passing message to application. Payload size: {0}", messageBuffer.Length); - while (await _application.Output.WaitToWriteAsync(_transportCts.Token)) + while (await _application.Out.WaitToWriteAsync(_transportCts.Token)) { - if (_application.Output.TryWrite(messageBuffer)) + if (_application.Out.TryWrite(messageBuffer)) { incomingMessage.Clear(); break; @@ -146,9 +147,9 @@ namespace Microsoft.AspNetCore.Sockets.Client try { - while (await _application.Input.WaitToReadAsync(_transportCts.Token)) + while (await _application.In.WaitToReadAsync(_transportCts.Token)) { - while (_application.Input.TryRead(out SendMessage message)) + while (_application.In.TryRead(out SendMessage message)) { try { diff --git a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs index e5a5b70d17..e7db22909c 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs @@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Sockets } // We only need to provide the Input channel since writing to the application is handled through /send. - var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory); + var sse = new ServerSentEventsTransport(connection.Application.In, connection.ConnectionId, _loggerFactory); await DoPersistentConnection(socketDelegate, sse, context, connection); } @@ -184,7 +184,7 @@ namespace Microsoft.AspNetCore.Sockets context.Response.RegisterForDispose(timeoutSource); context.Response.RegisterForDispose(tokenSource); - var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, connection.ConnectionId, _loggerFactory); + var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.In, connection.ConnectionId, _loggerFactory); // Start the transport connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token); @@ -206,7 +206,7 @@ namespace Microsoft.AspNetCore.Sockets if (resultTask == connection.ApplicationTask) { // Complete the transport (notifying it of the application error if there is one) - connection.Transport.Output.TryComplete(connection.ApplicationTask.Exception); + connection.Transport.Out.TryComplete(connection.ApplicationTask.Exception); // Wait for the transport to run await connection.TransportTask; @@ -384,9 +384,9 @@ namespace Microsoft.AspNetCore.Sockets } _logger.ReceivedBytes(connection.ConnectionId, buffer.Length); - while (!connection.Application.Output.TryWrite(buffer)) + while (!connection.Application.Out.TryWrite(buffer)) { - if (!await connection.Application.Output.WaitToWriteAsync()) + if (!await connection.Application.Out.WaitToWriteAsync()) { return; } diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs index 9f78d2209c..bc41807e85 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs @@ -7,6 +7,7 @@ using System.Diagnostics; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -16,10 +17,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports { private readonly WebSocketOptions _options; private readonly ILogger _logger; - private readonly IChannelConnection _application; + private readonly Channel _application; private readonly string _connectionId; - public WebSocketsTransport(WebSocketOptions options, IChannelConnection application, string connectionId, ILoggerFactory loggerFactory) + public WebSocketsTransport(WebSocketOptions options, Channel application, string connectionId, ILoggerFactory loggerFactory) { if (options == null) { @@ -80,7 +81,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports } // We're done writing - _application.Output.TryComplete(); + _application.Out.TryComplete(); await socket.CloseOutputAsync(failed ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); @@ -153,9 +154,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports } _logger.MessageToApplication(_connectionId, messageBuffer.Length); - while (await _application.Output.WaitToWriteAsync()) + while (await _application.Out.WaitToWriteAsync()) { - if (_application.Output.TryWrite(messageBuffer)) + if (_application.Out.TryWrite(messageBuffer)) { incomingMessage.Clear(); break; @@ -166,10 +167,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports private async Task StartSending(WebSocket ws) { - while (await _application.Input.WaitToReadAsync()) + while (await _application.In.WaitToReadAsync()) { // Get a frame from the application - while (_application.Input.TryRead(out var buffer)) + while (_application.In.TryRead(out var buffer)) { if (buffer.Length > 0) { diff --git a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs index 3dcac4856b..7ca38dea79 100644 --- a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs +++ b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs @@ -54,8 +54,8 @@ namespace Microsoft.AspNetCore.Sockets var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - var transportSide = new ChannelConnection(applicationToTransport, transportToApplication); - var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport); + var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication); + var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport); var connection = new DefaultConnectionContext(id, applicationSide, transportSide); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ConnectionTests.cs index 45cc5e8355..0dbfb9fbb7 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ConnectionTests.cs @@ -7,6 +7,7 @@ using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.Client.Tests; using Microsoft.AspNetCore.SignalR.Tests.Common; using Microsoft.Extensions.Logging; @@ -151,7 +152,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests releaseDisposeTcs.SetResult(null); await disposeTask.OrTimeout(); - transport.Verify(t => t.StartAsync(It.IsAny(), It.IsAny>()), Times.Never); + transport.Verify(t => t.StartAsync(It.IsAny(), It.IsAny>()), Times.Never); } [Fact] @@ -234,7 +235,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests }); var mockTransport = new Mock(); - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) .Returns(Task.FromException(new InvalidOperationException("Transport failed to start"))); @@ -330,9 +331,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests }); var mockTransport = new Mock(); - IChannelConnection channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) - .Returns>((url, c) => + Channel channel = null; + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + .Returns>((url, c) => { channel = c; return Task.CompletedTask; @@ -342,8 +343,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests { // The connection is now in the Disconnected state so the Received event for // this message should not be raised - channel.Output.TryWrite(Array.Empty()); - channel.Output.TryComplete(); + channel.Out.TryWrite(Array.Empty()); + channel.Out.TryComplete(); return Task.CompletedTask; }); @@ -372,9 +373,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests }); var mockTransport = new Mock(); - IChannelConnection channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) - .Returns>((url, c) => + Channel channel = null; + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + .Returns>((url, c) => { channel = c; return Task.CompletedTask; @@ -382,7 +383,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests mockTransport.Setup(t => t.StopAsync()) .Returns(() => { - channel.Output.TryComplete(); + channel.Out.TryComplete(); return Task.CompletedTask; }); @@ -404,8 +405,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests connection.Closed += e => closedTcs.SetResult(null); await connection.StartAsync(); - channel.Output.TryWrite(Array.Empty()); - channel.Output.TryWrite(Array.Empty()); + channel.Out.TryWrite(Array.Empty()); + channel.Out.TryWrite(Array.Empty()); await allowDisposeTcs.Task.OrTimeout(); await connection.DisposeAsync(); Assert.Equal(2, receivedInvocationCount); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs index 685ba789d6..22f7a29306 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs @@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.Client.Tests { var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); - var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); + var channelConnection = ChannelConnection.Create(connectionToTransport, transportToConnection); await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection); await longPollingTransport.Running.OrTimeout(); diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs index 2575514352..08c0a5d5b3 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs @@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests { public async override Task OnConnectedAsync(ConnectionContext connection) { - await connection.Transport.Output.WriteAsync(await connection.Transport.Input.ReadAsync()); + await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync()); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs index c21c90cdb2..37d01ea6ec 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs @@ -20,9 +20,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests private static int _id; private IHubProtocol _protocol; private CancellationTokenSource _cts; + private ChannelConnection _transport; public DefaultConnectionContext Connection { get; } - public IChannelConnection Application { get; } + public Channel Application { get; } public Task Connected => Connection.Metadata.Get>("ConnectedTask").Task; public TestClient() @@ -31,9 +32,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests var applicationToTransport = Channel.CreateUnbounded(); Application = ChannelConnection.Create(input: applicationToTransport, output: transportToApplication); - var transport = ChannelConnection.Create(input: transportToApplication, output: applicationToTransport); + _transport = ChannelConnection.Create(input: transportToApplication, output: applicationToTransport); - Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), transport, Application); + Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), _transport, Application); Connection.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.Name, Interlocked.Increment(ref _id).ToString()) })); Connection.Metadata["ConnectedTask"] = new TaskCompletionSource(); @@ -110,7 +111,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests var invocationId = GetInvocationId(); var payload = _protocol.WriteToArray(new InvocationMessage(invocationId, nonBlocking: false, target: methodName, arguments: args)); - await Application.Output.WriteAsync(payload); + await Application.Out.WriteAsync(payload); return invocationId; } @@ -123,7 +124,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests if (message == null) { - if (!await Application.Input.WaitToReadAsync()) + if (!await Application.In.WaitToReadAsync()) { return null; } @@ -137,7 +138,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests public HubMessage TryRead() { - if (Application.Input.TryRead(out var buffer) && + if (Application.In.TryRead(out var buffer) && _protocol.TryParseMessages(buffer, this, out var messages)) { return messages[0]; @@ -148,7 +149,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests public void Dispose() { _cts.Cancel(); - Connection.Transport.Dispose(); + _transport.Dispose(); } private static string GetInvocationId() diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs index 0bb9bd5edd..6f054633d7 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs @@ -81,12 +81,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests connection.ApplicationTask = Task.Run(async () => { - Assert.False(await connection.Transport.Input.WaitToReadAsync()); + Assert.False(await connection.Transport.In.WaitToReadAsync()); }); connection.TransportTask = Task.Run(async () => { - Assert.False(await connection.Application.Input.WaitToReadAsync()); + Assert.False(await connection.Application.In.WaitToReadAsync()); }); connectionManager.CloseConnections(); diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs index c0d50215cc..297549971b 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs @@ -478,7 +478,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the transport so the poll yields - await connection.Transport.Output.WriteAsync(buffer); + await connection.Transport.Out.WriteAsync(buffer); await task; @@ -510,7 +510,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the application - await connection.Application.Output.WriteAsync(buffer); + await connection.Application.Out.WriteAsync(buffer); await task; @@ -540,7 +540,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the application - await connection.Application.Output.WriteAsync(buffer); + await connection.Application.Out.WriteAsync(buffer); await task; @@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests await task1.OrTimeout(); // Send a message from the app to complete Task 2 - await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")); + await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")); await task2.OrTimeout(); @@ -703,7 +703,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") })); var endPointTask = dispatcher.ExecuteAsync(context, options, app); - await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); + await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); await endPointTask.OrTimeout(); @@ -777,7 +777,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests })); var endPointTask = dispatcher.ExecuteAsync(context, options, app); - await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); + await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); await endPointTask.OrTimeout(); @@ -827,7 +827,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") })); var endPointTask = dispatcher.ExecuteAsync(context, options, app); - await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); + await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout(); await endPointTask.OrTimeout(); @@ -1036,7 +1036,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests { public override Task OnConnectedAsync(ConnectionContext connection) { - connection.Transport.Input.WaitToReadAsync().Wait(); + connection.Transport.In.WaitToReadAsync().Wait(); return Task.CompletedTask; } } @@ -1061,7 +1061,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests { public override async Task OnConnectedAsync(ConnectionContext connection) { - while (await connection.Transport.Input.WaitToReadAsync()) + while (await connection.Transport.In.WaitToReadAsync()) { } } diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs index 6d76940540..15d179fb11 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs @@ -25,8 +25,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory()); @@ -45,10 +45,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests cancellationToken: CancellationToken.None); await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); - var buffer = await applicationSide.Input.In.ReadAsync(); + var buffer = await applicationSide.In.ReadAsync(); Assert.Equal("Hello", Encoding.UTF8.GetString(buffer)); - Assert.True(applicationSide.Output.Out.TryComplete()); + Assert.True(applicationSide.Out.TryComplete()); // The transport should finish now await transport; @@ -68,8 +68,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory()); @@ -81,8 +81,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var client = feature.Client.ExecuteAndCaptureFramesAsync(); // Write to the output channel, and then complete it - await applicationSide.Output.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello")); - Assert.True(applicationSide.Output.Out.TryComplete()); + await applicationSide.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello")); + Assert.True(applicationSide.Out.TryComplete()); // The client should finish now, as should the server var clientSummary = await client; @@ -102,8 +102,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var options = new WebSocketOptions() @@ -136,8 +136,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory()); @@ -149,7 +149,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var client = feature.Client.ExecuteAndCaptureFramesAsync(); // Fail in the app - Assert.True(applicationSide.Output.Out.TryComplete(new InvalidOperationException("Catastrophic failure."))); + Assert.True(applicationSide.Out.TryComplete(new InvalidOperationException("Catastrophic failure."))); var clientSummary = await client.OrTimeout(); Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus); @@ -167,8 +167,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var options = new WebSocketOptions() @@ -200,8 +200,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var options = new WebSocketOptions() @@ -233,8 +233,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var options = new WebSocketOptions() @@ -270,8 +270,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); - using (var transportSide = new ChannelConnection(applicationToTransport, transportToApplication)) - using (var applicationSide = new ChannelConnection(transportToApplication, applicationToTransport)) + using (var transportSide = ChannelConnection.Create(applicationToTransport, transportToApplication)) + using (var applicationSide = ChannelConnection.Create(transportToApplication, applicationToTransport)) using (var feature = new TestWebSocketConnectionFeature()) { var options = new WebSocketOptions()