Clean up client ITransport interface (#1893)

- Transports own the pipe creation. The fact they are 2 sides is an implementation detail.
This commit is contained in:
David Fowler 2018-04-07 15:11:42 -07:00 committed by GitHub
parent 86083c0302
commit e3da7feab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 165 additions and 188 deletions

View File

@ -9,7 +9,9 @@ using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
using Microsoft.AspNetCore.Http.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Internal;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
@ -30,8 +32,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
private bool _started;
private bool _disposed;
private IDuplexPipe _transportPipe;
private readonly HttpClient _httpClient;
private readonly HttpOptions _httpOptions;
private ITransport _transport;
@ -49,11 +49,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
get
{
CheckDisposed();
if (_transportPipe == null)
if (_transport == null)
{
throw new InvalidOperationException($"Cannot access the {nameof(Transport)} pipe before the connection has started.");
}
return _transportPipe;
return _transport;
}
}
@ -153,7 +153,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
private async Task DisposeAsyncCore(Exception exception = null)
private async Task DisposeAsyncCore()
{
if (_disposed)
{
@ -167,10 +167,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
{
Log.DisposingHttpConnection(_logger);
// Complete our ends of the pipes.
_transportPipe.Input.Complete(exception);
_transportPipe.Output.Complete(exception);
// Stop the transport, but we don't care if it throws.
// The transport should also have completed the pipe with this exception.
try
@ -331,34 +327,30 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat)
{
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var options = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0);
var pair = DuplexPipe.CreateConnectionPair(options, options);
// Construct the transport
var transport = _transportFactory.CreateTransport(transportType);
// Start the transport, giving it one end of the pipe
try
{
await transport.StartAsync(connectUrl, pair.Application, transferFormat, this);
await transport.StartAsync(connectUrl, transferFormat);
}
catch (Exception ex)
{
Log.ErrorStartingTransport(_logger, transport, ex);
// Clean up pipes and null out transport when we fail to start.
pair.Transport.Input.Complete();
pair.Transport.Output.Complete();
pair.Application.Input.Complete();
pair.Application.Output.Complete();
_transport = null;
throw;
}
if (transportType == HttpTransportType.LongPolling)
{
// Disable keep alives for long polling
Features.Set<IConnectionInherentKeepAliveFeature>(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout));
}
// We successfully started, set the transport properties (we don't want to set these until the transport is definitely running).
_transport = transport;
_transportPipe = pair.Transport;
Log.TransportStarted(_logger, _transport);
}

View File

@ -8,9 +8,9 @@ using Microsoft.AspNetCore.Connections;
namespace Microsoft.AspNetCore.Http.Connections.Client
{
public interface ITransport
public interface ITransport : IDuplexPipe
{
Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection);
Task StartAsync(Uri url, TransferFormat transferFormat);
Task StopAsync();
}
}

View File

@ -0,0 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public static class ClientPipeOptions
{
public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0);
}
}

View File

@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
private readonly HttpClient _httpClient;
private readonly ILogger _logger;
private IDuplexPipe _application;
private IDuplexPipe _transport;
// Volatile so that the poll loop sees the updated value set from a different thread
private volatile Exception _error;
@ -27,6 +28,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public Task Running { get; private set; } = Task.CompletedTask;
public PipeReader Input => _transport.Input;
public PipeWriter Output => _transport.Output;
public LongPollingTransport(HttpClient httpClient)
: this(httpClient, null)
{ }
@ -37,19 +42,22 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
}
public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
public Task StartAsync(Uri url, TransferFormat transferFormat)
{
if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text)
{
throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat));
}
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout));
_application = application;
Log.StartTransport(_logger, transferFormat);
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transport = pair.Transport;
_application = pair.Application;
Running = ProcessAsync(url);
return Task.CompletedTask;
@ -89,6 +97,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
Log.TransportStopping(_logger);
_transport.Output.Complete();
_transport.Input.Complete();
_application.Input.CancelPendingRead();
try

View File

@ -22,11 +22,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
private volatile Exception _error;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser();
private IDuplexPipe _transport;
private IDuplexPipe _application;
public Task Running { get; private set; } = Task.CompletedTask;
public PipeReader Input => _transport.Input;
public PipeWriter Output => _transport.Output;
public ServerSentEventsTransport(HttpClient httpClient)
: this(httpClient, null)
{ }
@ -42,15 +46,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
}
public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
public async Task StartAsync(Uri url, TransferFormat transferFormat)
{
if (transferFormat != TransferFormat.Text)
{
throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat));
}
_application = application;
Log.StartTransport(_logger, transferFormat);
var request = new HttpRequestMessage(HttpMethod.Get, url);
@ -72,6 +74,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
throw;
}
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transport = pair.Transport;
_application = pair.Application;
Running = ProcessAsync(url, response);
}
@ -194,6 +203,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
Log.TransportStopping(_logger);
_transport.Output.Complete();
_transport.Input.Complete();
_application.Input.CancelPendingRead();
try

View File

@ -23,8 +23,14 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
private readonly TimeSpan _closeTimeout;
private volatile bool _aborted;
private IDuplexPipe _transport;
public Task Running { get; private set; } = Task.CompletedTask;
public PipeReader Input => _transport.Input;
public PipeWriter Output => _transport.Output;
public WebSocketsTransport()
: this(null, null)
{
@ -89,24 +95,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketsTransport>();
}
public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
public async Task StartAsync(Uri url, TransferFormat transferFormat)
{
if (url == null)
{
throw new ArgumentNullException(nameof(url));
}
if (application == null)
{
throw new ArgumentNullException(nameof(application));
}
if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text)
{
throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat));
}
_application = application;
_webSocketMessageType = transferFormat == TransferFormat.Binary
? WebSocketMessageType.Binary
: WebSocketMessageType.Text;
@ -117,6 +117,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
await _webSocket.ConnectAsync(resolvedUrl, CancellationToken.None);
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transport = pair.Transport;
_application = pair.Application;
// TODO: Handle TCP connection errors
// https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251
Running = ProcessSocketAsync(_webSocket);
@ -359,6 +366,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
Log.TransportStopping(_logger);
_transport.Output.Complete();
_transport.Input.Complete();
// Cancel any pending reads from the application, this should start the entire shutdown process
_application.Input.CancelPendingRead();

View File

@ -159,28 +159,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task PipesAreDisposedAfterTransportFailsToStart()
{
using (StartLog(out var loggerFactory))
{
var writerTcs = new TaskCompletionSource<object>();
var readerTcs = new TaskCompletionSource<object>();
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new FakeTransport(writerTcs, readerTcs)),
async (connection) =>
{
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.StartAsync(TransferFormat.Text));
Assert.Equal("Unable to connect to the server with any of the available transports.", ex.Message);
Assert.True(writerTcs.Task.IsCompleted);
Assert.True(readerTcs.Task.IsCompleted);
});
}
}
[Fact]
public async Task CanDisposeUnstartedConnection()
{
@ -378,35 +356,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
private class FakeTransport : ITransport
{
private IDuplexPipe _application;
private TaskCompletionSource<object> _writerTcs;
private TaskCompletionSource<object> _readerTcs;
public FakeTransport(TaskCompletionSource<object> writerTcs, TaskCompletionSource<object> readerTcs)
{
_writerTcs = writerTcs;
_readerTcs = readerTcs;
}
public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
{
_application = application;
Action<Exception, object> onCompletedCallback = (ex, tcs) => { ((TaskCompletionSource<object>)tcs).TrySetResult(null); };
_application.Input.OnWriterCompleted(onCompletedCallback, _writerTcs);
_application.Output.OnReaderCompleted(onCompletedCallback, _readerTcs);
throw new Exception();
}
public Task StopAsync()
{
_application.Output.Complete();
_application.Input.Complete();
return Task.CompletedTask;
}
}
private static async Task AssertDisposedAsync(HttpConnection connection)
{
var exception =

View File

@ -42,8 +42,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
transportActiveTask = longPollingTransport.Running;
@ -76,14 +75,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
await longPollingTransport.Running.OrTimeout();
Assert.True(pair.Transport.Input.TryRead(out var result));
Assert.True(longPollingTransport.Input.TryRead(out var result));
Assert.True(result.IsCompleted);
pair.Transport.Input.AdvanceTo(result.Buffer.End);
longPollingTransport.Input.AdvanceTo(result.Buffer.End);
}
finally
{
@ -130,10 +128,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
var data = await pair.Transport.Input.ReadAllAsync().OrTimeout();
var data = await longPollingTransport.Input.ReadAllAsync().OrTimeout();
await longPollingTransport.Running.OrTimeout();
Assert.Equal(Encoding.UTF8.GetBytes("HelloWorld"), data);
}
@ -161,15 +158,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
var exception =
await Assert.ThrowsAsync<HttpRequestException>(async () =>
{
async Task ReadAsync()
{
await pair.Transport.Input.ReadAsync();
await longPollingTransport.Input.ReadAsync();
}
await ReadAsync().OrTimeout();
@ -203,14 +199,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
await longPollingTransport.Running.OrTimeout();
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await pair.Transport.Input.ReadAllAsync().OrTimeout());
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Input.ReadAllAsync().OrTimeout());
Assert.Contains(" 500 ", exception.Message);
}
finally
@ -237,14 +232,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
pair.Transport.Output.Complete();
longPollingTransport.Output.Complete();
await longPollingTransport.Running.OrTimeout();
await pair.Transport.Input.ReadAllAsync().OrTimeout();
await longPollingTransport.Input.ReadAllAsync().OrTimeout();
}
finally
{
@ -283,16 +277,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
// Start the transport
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
// Wait for the transport to finish
await longPollingTransport.Running.OrTimeout();
// Pull Messages out of the channel
var message = await pair.Transport.Input.ReadAllAsync();
var message = await longPollingTransport.Input.ReadAllAsync();
// Check the provided request
Assert.Equal(2, sentRequests.Count);
@ -331,19 +323,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var longPollingTransport = new LongPollingTransport(httpClient);
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
// Pre-queue some messages
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("World"));
// Start the transport
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
pair.Transport.Output.Complete();
await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("World"));
longPollingTransport.Output.Complete();
await longPollingTransport.Running.OrTimeout();
await pair.Transport.Input.ReadAllAsync();
await longPollingTransport.Input.ReadAllAsync();
Assert.Single(sentRequests);
Assert.Equal(new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d'
@ -376,9 +365,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, transferFormat, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat);
}
finally
{
@ -405,7 +392,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var longPollingTransport = new LongPollingTransport(httpClient);
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), null, transferFormat, connection: new TestConnection()));
longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat));
Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message);
Assert.Equal("transferFormat", exception.ParamName);
@ -440,8 +427,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: new TestConnection());
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary);
var completedTask = await Task.WhenAny(completionTcs.Task, longPollingTransport.Running).OrTimeout();
Assert.Equal(completionTcs.Task, completedTask);

View File

@ -50,9 +50,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
await eventStreamTcs.Task.OrTimeout();
await sseTransport.StopAsync().OrTimeout();
@ -68,7 +67,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
{
var eventStreamCts = new CancellationTokenSource();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -79,11 +77,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!eventStreamCts.IsCancellationRequested)
while (!t.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
await Task.Delay(100);
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
@ -99,14 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
try
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
transportActiveTask = sseTransport.Running;
Assert.False(transportActiveTask.IsCompleted);
var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout();
var message = await sseTransport.Input.ReadSingleAsync().OrTimeout();
Assert.StartsWith("3:abc", Encoding.ASCII.GetString(message));
}
finally
@ -115,7 +111,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
await transportActiveTask.OrTimeout();
eventStreamCts.Cancel();
}
}
@ -146,11 +141,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
var exception = await Assert.ThrowsAsync<FormatException>(() => pair.Transport.Input.ReadAllAsync());
var exception = await Assert.ThrowsAsync<FormatException>(() => sseTransport.Input.ReadAllAsync());
await sseTransport.Running.OrTimeout();
@ -192,15 +186,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
await eventStreamTcs.Task;
await pair.Transport.Output.WriteAsync(new byte[] { 0x42 });
await sseTransport.Output.WriteAsync(new byte[] { 0x42 });
var exception = await Assert.ThrowsAsync<HttpRequestException>(() => pair.Transport.Input.ReadAllAsync().OrTimeout());
var exception = await Assert.ThrowsAsync<HttpRequestException>(() => sseTransport.Input.ReadAllAsync().OrTimeout());
Assert.Contains("500", exception.Message);
// Errors are only communicated through the pipe
@ -237,13 +229,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
await eventStreamTcs.Task.OrTimeout();
pair.Transport.Output.Complete();
sseTransport.Output.Complete();
await sseTransport.Running.OrTimeout();
}
@ -265,12 +255,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout();
var message = await sseTransport.Input.ReadSingleAsync().OrTimeout();
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
await sseTransport.Running.OrTimeout();
@ -293,8 +281,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var ex = await Assert.ThrowsAsync<ArgumentException>(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Binary, connection: Mock.Of<IConnection>()).OrTimeout());
var ex = await Assert.ThrowsAsync<ArgumentException>(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary).OrTimeout());
Assert.Equal($"The 'Binary' transfer format is not supported by this transport.{Environment.NewLine}Parameter name: transferFormat", ex.Message);
}
}
@ -318,7 +305,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
sseTransport.StartAsync(new Uri("http://fakeuri.org"), null, transferFormat, connection: Mock.Of<IConnection>()));
sseTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat));
Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message);
Assert.Equal("transferFormat", exception.ParamName);

View File

@ -3,6 +3,7 @@ using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
@ -15,6 +16,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public IDuplexPipe Application { get; private set; }
public Task Receiving { get; private set; }
private IDuplexPipe _transport;
public PipeReader Input => _transport.Input;
public PipeWriter Output => _transport.Output;
public TestTransport(Func<Task> onTransportStop = null, Func<Task> onTransportStart = null, TransferFormat transferFormat = TransferFormat.Text)
{
_stopHandler = onTransportStop ?? new Func<Task>(() => Task.CompletedTask);
@ -22,13 +29,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Format = transferFormat;
}
public async Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
public async Task StartAsync(Uri url, TransferFormat transferFormat)
{
if ((Format & transferFormat) == 0)
{
throw new InvalidOperationException($"The '{transferFormat}' transfer format is not supported by this transport.");
}
Application = application;
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transport = pair.Transport;
Application = pair.Application;
await _startHandler();
// Start a loop to read from the pipe
@ -65,6 +77,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public Task StopAsync()
{
_transport.Output.Complete();
_transport.Input.Complete();
Application.Input.CancelPendingRead();
return Receiving;
}

View File

@ -13,6 +13,7 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.AspNetCore.WebUtilities;
@ -471,8 +472,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
private int _tries;
private string _prevConnectionId = null;
private IDuplexPipe _application;
private IDuplexPipe _transport;
private int availableTransports = 3;
public PipeReader Input => _transport.Input;
public PipeWriter Output => _transport.Output;
public FakeTransport()
{
if (!TestHelpers.IsWebSocketsSupported())
@ -481,9 +487,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public Task StartAsync(Uri url, IDuplexPipe application, TransferFormat transferFormat, IConnection connection)
public Task StartAsync(Uri url, TransferFormat transferFormat)
{
_application = application;
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);
_transport = pair.Transport;
_application = pair.Application;
_tries++;
Assert.True(QueryHelpers.ParseQuery(url.Query).TryGetValue("id", out var id));
if (_prevConnectionId == null)

View File

@ -69,10 +69,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application,
TransferFormat.Binary, connection: Mock.Of<IConnection>()).OrTimeout();
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"),
TransferFormat.Binary).OrTimeout();
await webSocketsTransport.StopAsync().OrTimeout();
await webSocketsTransport.Running.OrTimeout();
}
@ -84,17 +83,16 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), pair.Application,
TransferFormat.Binary, connection: Mock.Of<IConnection>()).OrTimeout();
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"),
TransferFormat.Binary).OrTimeout();
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("User-Agent"));
await webSocketsTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("User-Agent"));
// The HTTP header endpoint closes the connection immediately after sending response which should stop the transport
await webSocketsTransport.Running.OrTimeout();
Assert.True(pair.Transport.Input.TryRead(out var result));
Assert.True(webSocketsTransport.Input.TryRead(out var result));
var userAgent = Encoding.UTF8.GetString(result.Buffer.ToArray());
@ -113,17 +111,16 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"), pair.Application,
TransferFormat.Binary, connection: Mock.Of<IConnection>()).OrTimeout();
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/httpheader"),
TransferFormat.Binary).OrTimeout();
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("X-Requested-With"));
await webSocketsTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("X-Requested-With"));
// The HTTP header endpoint closes the connection immediately after sending response which should stop the transport
await webSocketsTransport.Running.OrTimeout();
Assert.True(pair.Transport.Input.TryRead(out var result));
Assert.True(webSocketsTransport.Input.TryRead(out var result));
var headerValue = Encoding.UTF8.GetString(result.Buffer.ToArray());
@ -137,11 +134,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application,
TransferFormat.Binary, connection: Mock.Of<IConnection>());
pair.Transport.Output.Complete();
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"),
TransferFormat.Binary);
webSocketsTransport.Output.Complete();
await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
}
}
@ -154,18 +150,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echoAndClose"), pair.Application, transferFormat, connection: Mock.Of<IConnection>());
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echoAndClose"), transferFormat);
await pair.Transport.Output.WriteAsync(new byte[] { 0x42 });
await webSocketsTransport.Output.WriteAsync(new byte[] { 0x42 });
// The echoAndClose endpoint closes the connection immediately after sending response which should stop the transport
await webSocketsTransport.Running.OrTimeout();
Assert.True(pair.Transport.Input.TryRead(out var result));
Assert.True(webSocketsTransport.Input.TryRead(out var result));
Assert.Equal(new byte[] { 0x42 }, result.Buffer.ToArray());
pair.Transport.Input.AdvanceTo(result.Buffer.End);
webSocketsTransport.Input.AdvanceTo(result.Buffer.End);
}
}
@ -177,11 +172,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), pair.Application,
transferFormat, connection: Mock.Of<IConnection>()).OrTimeout();
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"),
transferFormat).OrTimeout();
await webSocketsTransport.StopAsync().OrTimeout();
await webSocketsTransport.Running.OrTimeout();
@ -196,10 +190,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory))
{
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), pair.Application, transferFormat, connection: Mock.Of<IConnection>()));
webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat));
Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message);
Assert.Equal("transferFormat", exception.ParamName);