diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs index d6ed13ee78..d9fe1790b8 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs @@ -9,7 +9,9 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Http.Connections.Client.Internal; +using Microsoft.AspNetCore.Http.Connections.Features; using Microsoft.AspNetCore.Http.Connections.Internal; using Microsoft.AspNetCore.Http.Features; using Microsoft.Extensions.Logging; @@ -30,8 +32,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client private bool _started; private bool _disposed; - private IDuplexPipe _transportPipe; - private readonly HttpClient _httpClient; private readonly HttpOptions _httpOptions; private ITransport _transport; @@ -49,11 +49,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client get { CheckDisposed(); - if (_transportPipe == null) + if (_transport == null) { throw new InvalidOperationException($"Cannot access the {nameof(Transport)} pipe before the connection has started."); } - return _transportPipe; + return _transport; } } @@ -153,7 +153,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync(); - private async Task DisposeAsyncCore(Exception exception = null) + private async Task DisposeAsyncCore() { if (_disposed) { @@ -167,10 +167,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client { Log.DisposingHttpConnection(_logger); - // Complete our ends of the pipes. - _transportPipe.Input.Complete(exception); - _transportPipe.Output.Complete(exception); - // Stop the transport, but we don't care if it throws. // The transport should also have completed the pipe with this exception. try @@ -331,34 +327,30 @@ namespace Microsoft.AspNetCore.Http.Connections.Client private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat) { - // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) - var options = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0); - var pair = DuplexPipe.CreateConnectionPair(options, options); - // Construct the transport var transport = _transportFactory.CreateTransport(transportType); // Start the transport, giving it one end of the pipe try { - await transport.StartAsync(connectUrl, pair.Application, transferFormat, this); + await transport.StartAsync(connectUrl, transferFormat); } catch (Exception ex) { Log.ErrorStartingTransport(_logger, transport, ex); - // Clean up pipes and null out transport when we fail to start. - pair.Transport.Input.Complete(); - pair.Transport.Output.Complete(); - pair.Application.Input.Complete(); - pair.Application.Output.Complete(); _transport = null; throw; } + if (transportType == HttpTransportType.LongPolling) + { + // Disable keep alives for long polling + Features.Set(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout)); + } + // We successfully started, set the transport properties (we don't want to set these until the transport is definitely running). _transport = transport; - _transportPipe = pair.Transport; Log.TransportStarted(_logger, _transport); } diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/ITransport.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/ITransport.cs index a86cc089fb..db657f88ed 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/ITransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/ITransport.cs @@ -8,9 +8,9 @@ using Microsoft.AspNetCore.Connections; namespace Microsoft.AspNetCore.Http.Connections.Client { - public interface ITransport + public interface ITransport : IDuplexPipe { - Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection); + Task StartAsync(Uri url, TransferFormat transferFormat); Task StopAsync(); } } diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ClientPipeOptions.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ClientPipeOptions.cs new file mode 100644 index 0000000000..a448404039 --- /dev/null +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ClientPipeOptions.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.IO.Pipelines; + +namespace Microsoft.AspNetCore.Http.Connections.Client.Internal +{ + public static class ClientPipeOptions + { + public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0); + } +} diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/LongPollingTransport.cs index ffb31489f4..cd68844808 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/LongPollingTransport.cs @@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal private readonly HttpClient _httpClient; private readonly ILogger _logger; private IDuplexPipe _application; + private IDuplexPipe _transport; // Volatile so that the poll loop sees the updated value set from a different thread private volatile Exception _error; @@ -27,6 +28,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal public Task Running { get; private set; } = Task.CompletedTask; + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + public LongPollingTransport(HttpClient httpClient) : this(httpClient, null) { } @@ -37,19 +42,22 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) + public Task StartAsync(Uri url, TransferFormat transferFormat) { if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text) { throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat)); } - connection.Features.Set(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout)); - - _application = application; - Log.StartTransport(_logger, transferFormat); + // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + _application = pair.Application; + Running = ProcessAsync(url); return Task.CompletedTask; @@ -89,6 +97,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal { Log.TransportStopping(_logger); + _transport.Output.Complete(); + _transport.Input.Complete(); + _application.Input.CancelPendingRead(); try diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs index 029f56f1c8..ec126556de 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs @@ -22,11 +22,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal private volatile Exception _error; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser(); - + private IDuplexPipe _transport; private IDuplexPipe _application; public Task Running { get; private set; } = Task.CompletedTask; + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + public ServerSentEventsTransport(HttpClient httpClient) : this(httpClient, null) { } @@ -42,15 +46,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) + public async Task StartAsync(Uri url, TransferFormat transferFormat) { if (transferFormat != TransferFormat.Text) { throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat)); } - _application = application; - Log.StartTransport(_logger, transferFormat); var request = new HttpRequestMessage(HttpMethod.Get, url); @@ -72,6 +74,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal throw; } + // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + _application = pair.Application; + Running = ProcessAsync(url, response); } @@ -194,6 +203,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal { Log.TransportStopping(_logger); + _transport.Output.Complete(); + _transport.Input.Complete(); + _application.Input.CancelPendingRead(); try diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/WebSocketsTransport.cs index e3ef3e3309..c951f0fbb7 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/WebSocketsTransport.cs @@ -23,8 +23,14 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal private readonly TimeSpan _closeTimeout; private volatile bool _aborted; + private IDuplexPipe _transport; + public Task Running { get; private set; } = Task.CompletedTask; + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + public WebSocketsTransport() : this(null, null) { @@ -89,24 +95,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } - public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) + public async Task StartAsync(Uri url, TransferFormat transferFormat) { if (url == null) { throw new ArgumentNullException(nameof(url)); } - if (application == null) - { - throw new ArgumentNullException(nameof(application)); - } - if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text) { throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat)); } - _application = application; _webSocketMessageType = transferFormat == TransferFormat.Binary ? WebSocketMessageType.Binary : WebSocketMessageType.Text; @@ -117,6 +117,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal await _webSocket.ConnectAsync(resolvedUrl, CancellationToken.None); + // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + _application = pair.Application; + // TODO: Handle TCP connection errors // https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251 Running = ProcessSocketAsync(_webSocket); @@ -359,6 +366,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal { Log.TransportStopping(_logger); + _transport.Output.Complete(); + _transport.Input.Complete(); + // Cancel any pending reads from the application, this should start the entire shutdown process _application.Input.CancelPendingRead(); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs index fb6bf5d9c1..1d7ef85ac8 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs @@ -159,28 +159,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } } - [Fact] - public async Task PipesAreDisposedAfterTransportFailsToStart() - { - using (StartLog(out var loggerFactory)) - { - var writerTcs = new TaskCompletionSource(); - var readerTcs = new TaskCompletionSource(); - await WithConnectionAsync( - CreateConnection( - loggerFactory: loggerFactory, - transport: new FakeTransport(writerTcs, readerTcs)), - async (connection) => - { - var ex = await Assert.ThrowsAsync(() => connection.StartAsync(TransferFormat.Text)); - Assert.Equal("Unable to connect to the server with any of the available transports.", ex.Message); - - Assert.True(writerTcs.Task.IsCompleted); - Assert.True(readerTcs.Task.IsCompleted); - }); - } - } - [Fact] public async Task CanDisposeUnstartedConnection() { @@ -378,35 +356,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } } - private class FakeTransport : ITransport - { - private IDuplexPipe _application; - private TaskCompletionSource _writerTcs; - private TaskCompletionSource _readerTcs; - - public FakeTransport(TaskCompletionSource writerTcs, TaskCompletionSource readerTcs) - { - _writerTcs = writerTcs; - _readerTcs = readerTcs; - } - - public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) - { - _application = application; - Action onCompletedCallback = (ex, tcs) => { ((TaskCompletionSource)tcs).TrySetResult(null); }; - _application.Input.OnWriterCompleted(onCompletedCallback, _writerTcs); - _application.Output.OnReaderCompleted(onCompletedCallback, _readerTcs); - throw new Exception(); - } - - public Task StopAsync() - { - _application.Output.Complete(); - _application.Input.Complete(); - return Task.CompletedTask; - } - } - private static async Task AssertDisposedAsync(HttpConnection connection) { var exception = diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs index c4103d2e2a..e6826d3947 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs @@ -42,8 +42,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); transportActiveTask = longPollingTransport.Running; @@ -76,14 +75,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); await longPollingTransport.Running.OrTimeout(); - Assert.True(pair.Transport.Input.TryRead(out var result)); + Assert.True(longPollingTransport.Input.TryRead(out var result)); Assert.True(result.IsCompleted); - pair.Transport.Input.AdvanceTo(result.Buffer.End); + longPollingTransport.Input.AdvanceTo(result.Buffer.End); } finally { @@ -130,10 +128,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); - var data = await pair.Transport.Input.ReadAllAsync().OrTimeout(); + var data = await longPollingTransport.Input.ReadAllAsync().OrTimeout(); await longPollingTransport.Running.OrTimeout(); Assert.Equal(Encoding.UTF8.GetBytes("HelloWorld"), data); } @@ -161,15 +158,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); var exception = await Assert.ThrowsAsync(async () => { async Task ReadAsync() { - await pair.Transport.Input.ReadAsync(); + await longPollingTransport.Input.ReadAsync(); } await ReadAsync().OrTimeout(); @@ -203,14 +199,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); - await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World")); + await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World")); await longPollingTransport.Running.OrTimeout(); - var exception = await Assert.ThrowsAsync(async () => await pair.Transport.Input.ReadAllAsync().OrTimeout()); + var exception = await Assert.ThrowsAsync(async () => await longPollingTransport.Input.ReadAllAsync().OrTimeout()); Assert.Contains(" 500 ", exception.Message); } finally @@ -237,14 +232,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); - pair.Transport.Output.Complete(); + longPollingTransport.Output.Complete(); await longPollingTransport.Running.OrTimeout(); - await pair.Transport.Input.ReadAllAsync().OrTimeout(); + await longPollingTransport.Input.ReadAllAsync().OrTimeout(); } finally { @@ -283,16 +277,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - // Start the transport - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); // Wait for the transport to finish await longPollingTransport.Running.OrTimeout(); // Pull Messages out of the channel - var message = await pair.Transport.Input.ReadAllAsync(); + var message = await longPollingTransport.Input.ReadAllAsync(); // Check the provided request Assert.Equal(2, sentRequests.Count); @@ -331,19 +323,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var longPollingTransport = new LongPollingTransport(httpClient); try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - - // Pre-queue some messages - await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello")); - await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("World")); - // Start the transport - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); - pair.Transport.Output.Complete(); + await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello")); + await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("World")); + + longPollingTransport.Output.Complete(); await longPollingTransport.Running.OrTimeout(); - await pair.Transport.Input.ReadAllAsync(); + await longPollingTransport.Input.ReadAllAsync(); Assert.Single(sentRequests); Assert.Equal(new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d' @@ -376,9 +365,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, transferFormat, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat); } finally { @@ -405,7 +392,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var longPollingTransport = new LongPollingTransport(httpClient); var exception = await Assert.ThrowsAsync(() => - longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), null, transferFormat, connection: new TestConnection())); + longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat)); Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message); Assert.Equal("transferFormat", exception.ParamName); @@ -440,8 +427,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection()); + await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary); var completedTask = await Task.WhenAny(completionTcs.Task, longPollingTransport.Running).OrTimeout(); Assert.Equal(completionTcs.Task, completedTask); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs index e5e01fc707..1e572a944e 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs @@ -50,9 +50,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); await eventStreamTcs.Task.OrTimeout(); await sseTransport.StopAsync().OrTimeout(); @@ -68,7 +67,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests [Fact] public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped() { - var eventStreamCts = new CancellationTokenSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) @@ -79,11 +77,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests .Setup(s => s.CopyToAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns(async (stream, bufferSize, t) => { - await Task.Yield(); var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n"); - while (!eventStreamCts.IsCancellationRequested) + while (!t.IsCancellationRequested) { await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout(); + await Task.Delay(100); } }); mockStream.Setup(s => s.CanRead).Returns(true); @@ -99,14 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests try { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); transportActiveTask = sseTransport.Running; Assert.False(transportActiveTask.IsCompleted); - var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout(); + var message = await sseTransport.Input.ReadSingleAsync().OrTimeout(); Assert.StartsWith("3:abc", Encoding.ASCII.GetString(message)); } finally @@ -115,7 +111,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } await transportActiveTask.OrTimeout(); - eventStreamCts.Cancel(); } } @@ -146,11 +141,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); - var exception = await Assert.ThrowsAsync(() => pair.Transport.Input.ReadAllAsync()); + var exception = await Assert.ThrowsAsync(() => sseTransport.Input.ReadAllAsync()); await sseTransport.Running.OrTimeout(); @@ -192,15 +186,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); await eventStreamTcs.Task; - await pair.Transport.Output.WriteAsync(new byte[] { 0x42 }); + await sseTransport.Output.WriteAsync(new byte[] { 0x42 }); - var exception = await Assert.ThrowsAsync(() => pair.Transport.Input.ReadAllAsync().OrTimeout()); + var exception = await Assert.ThrowsAsync(() => sseTransport.Input.ReadAllAsync().OrTimeout()); Assert.Contains("500", exception.Message); // Errors are only communicated through the pipe @@ -237,13 +229,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); await eventStreamTcs.Task.OrTimeout(); - pair.Transport.Output.Complete(); + sseTransport.Output.Complete(); await sseTransport.Running.OrTimeout(); } @@ -265,12 +255,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - await sseTransport.StartAsync( - new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); - var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout(); + var message = await sseTransport.Input.ReadSingleAsync().OrTimeout(); Assert.Equal("3:abc", Encoding.ASCII.GetString(message)); await sseTransport.Running.OrTimeout(); @@ -293,8 +281,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - var ex = await Assert.ThrowsAsync(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: Mock.Of()).OrTimeout()); + var ex = await Assert.ThrowsAsync(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary).OrTimeout()); Assert.Equal($"The 'Binary' transfer format is not supported by this transport.{Environment.NewLine}Parameter name: transferFormat", ex.Message); } } @@ -318,7 +305,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { var sseTransport = new ServerSentEventsTransport(httpClient); var exception = await Assert.ThrowsAsync(() => - sseTransport.StartAsync(new Uri("http://fakeuri.org"), null, transferFormat, connection: Mock.Of())); + sseTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat)); Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message); Assert.Equal("transferFormat", exception.ParamName); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs index 18902e462a..44c2144a51 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs @@ -3,6 +3,7 @@ using System.IO.Pipelines; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections.Client; +using Microsoft.AspNetCore.Http.Connections.Client.Internal; namespace Microsoft.AspNetCore.SignalR.Client.Tests { @@ -15,6 +16,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests public IDuplexPipe Application { get; private set; } public Task Receiving { get; private set; } + private IDuplexPipe _transport; + + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + public TestTransport(Func onTransportStop = null, Func onTransportStart = null, TransferFormat transferFormat = TransferFormat.Text) { _stopHandler = onTransportStop ?? new Func(() => Task.CompletedTask); @@ -22,13 +29,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests Format = transferFormat; } - public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) + public async Task StartAsync(Uri url, TransferFormat transferFormat) { if ((Format & transferFormat) == 0) { throw new InvalidOperationException($"The '{transferFormat}' transfer format is not supported by this transport."); } - Application = application; + + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + Application = pair.Application; await _startHandler(); // Start a loop to read from the pipe @@ -65,6 +77,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests public Task StopAsync() { + _transport.Output.Complete(); + _transport.Input.Complete(); + Application.Input.CancelPendingRead(); return Receiving; } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs index 8186af0960..38dd0ee325 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs @@ -13,6 +13,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.Http.Connections.Client; +using Microsoft.AspNetCore.Http.Connections.Client.Internal; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.AspNetCore.Testing.xunit; using Microsoft.AspNetCore.WebUtilities; @@ -471,8 +472,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests private int _tries; private string _prevConnectionId = null; private IDuplexPipe _application; + private IDuplexPipe _transport; private int availableTransports = 3; + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + public FakeTransport() { if (!TestHelpers.IsWebSocketsSupported()) @@ -481,9 +487,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } - public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection) + public Task StartAsync(Uri url, TransferFormat transferFormat) { - _application = application; + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + _application = pair.Application; _tries++; Assert.True(QueryHelpers.ParseQuery(url.Query).TryGetValue("id", out var id)); if (_prevConnectionId == null) diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs index 489f4a4757..02d4b1144f 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs @@ -69,10 +69,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application, - TransferFormat.Binary, connection: Mock.Of()).OrTimeout(); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), + TransferFormat.Binary).OrTimeout(); await webSocketsTransport.StopAsync().OrTimeout(); await webSocketsTransport.Running.OrTimeout(); } @@ -84,17 +83,16 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), pair.Application, - TransferFormat.Binary, connection: Mock.Of()).OrTimeout(); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), + TransferFormat.Binary).OrTimeout(); - await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("User-Agent")); + await webSocketsTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("User-Agent")); // The HTTP header endpoint closes the connection immediately after sending response which should stop the transport await webSocketsTransport.Running.OrTimeout(); - Assert.True(pair.Transport.Input.TryRead(out var result)); + Assert.True(webSocketsTransport.Input.TryRead(out var result)); var userAgent = Encoding.UTF8.GetString(result.Buffer.ToArray()); @@ -113,17 +111,16 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), pair.Application, - TransferFormat.Binary, connection: Mock.Of()).OrTimeout(); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), + TransferFormat.Binary).OrTimeout(); - await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("X-Requested-With")); + await webSocketsTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("X-Requested-With")); // The HTTP header endpoint closes the connection immediately after sending response which should stop the transport await webSocketsTransport.Running.OrTimeout(); - Assert.True(pair.Transport.Input.TryRead(out var result)); + Assert.True(webSocketsTransport.Input.TryRead(out var result)); var headerValue = Encoding.UTF8.GetString(result.Buffer.ToArray()); @@ -137,11 +134,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application, - TransferFormat.Binary, connection: Mock.Of()); - pair.Transport.Output.Complete(); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), + TransferFormat.Binary); + webSocketsTransport.Output.Complete(); await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10)); } } @@ -154,18 +150,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echoAndClose"), pair.Application, transferFormat, connection: Mock.Of()); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echoAndClose"), transferFormat); - await pair.Transport.Output.WriteAsync(new byte[] { 0x42 }); + await webSocketsTransport.Output.WriteAsync(new byte[] { 0x42 }); // The echoAndClose endpoint closes the connection immediately after sending response which should stop the transport await webSocketsTransport.Running.OrTimeout(); - Assert.True(pair.Transport.Input.TryRead(out var result)); + Assert.True(webSocketsTransport.Input.TryRead(out var result)); Assert.Equal(new byte[] { 0x42 }, result.Buffer.ToArray()); - pair.Transport.Input.AdvanceTo(result.Buffer.End); + webSocketsTransport.Input.AdvanceTo(result.Buffer.End); } } @@ -177,11 +172,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); - await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application, - transferFormat, connection: Mock.Of()).OrTimeout(); + await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), + transferFormat).OrTimeout(); await webSocketsTransport.StopAsync().OrTimeout(); await webSocketsTransport.Running.OrTimeout(); @@ -196,10 +190,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests { using (StartLog(out var loggerFactory)) { - var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory); var exception = await Assert.ThrowsAsync(() => - webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, transferFormat, connection: Mock.Of())); + webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat)); Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message); Assert.Equal("transferFormat", exception.ParamName);