clean up HttpConnectionTests (#1208)

This commit is contained in:
Andrew Stanton-Nurse 2017-12-19 11:12:39 -08:00 committed by GitHub
parent 00a6dc983a
commit 3bb71255d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1159 additions and 1315 deletions

View File

@ -2,13 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
@ -19,10 +13,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public class AbortAsync
{
[Fact]
public async Task AbortAsyncTriggersClosedEventWithException()
public Task AbortAsyncTriggersClosedEventWithException()
{
var connection = CreateConnection(out var closedTask);
try
return WithConnectionAsync(CreateConnection(), async (connection, closed) =>
{
// Start the connection
await connection.StartAsync().OrTimeout();
@ -32,22 +25,15 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.AbortAsync(expected).OrTimeout();
// Verify that it is thrown
var actual = await Assert.ThrowsAsync<Exception>(async () => await closedTask.OrTimeout());
var actual = await Assert.ThrowsAsync<Exception>(async () => await closed.OrTimeout());
Assert.Same(expected, actual);
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
});
}
[Fact]
public async Task AbortAsyncWhileStoppingTriggersClosedEventWithException()
public Task AbortAsyncWhileStoppingTriggersClosedEventWithException()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints));
try
return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(2, out var syncPoints))), async (connection, closed) =>
{
// Start the connection
await connection.StartAsync().OrTimeout();
@ -69,26 +55,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
syncPoints[0].Continue();
// We should close with the error from Abort (because it was set by the call to Abort even though Stop triggered the close)
var actual = await Assert.ThrowsAsync<Exception>(async () => await closedTask.OrTimeout());
var actual = await Assert.ThrowsAsync<Exception>(async () => await closed.OrTimeout());
Assert.Same(expected, actual);
// Clean-up
syncPoints[1].Continue();
await Task.WhenAll(stopTask, abortTask).OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
});
}
[Fact]
public async Task StopAsyncWhileAbortingTriggersClosedEventWithoutException()
public Task StopAsyncWhileAbortingTriggersClosedEventWithoutException()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints));
try
return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(2, out var syncPoints))), async (connection, closed) =>
{
// Start the connection
await connection.StartAsync().OrTimeout();
@ -104,25 +83,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// This should clear the exception, meaning Closed will not "throw"
syncPoints[1].Continue();
await connection.StopAsync();
await closedTask.OrTimeout();
await closed.OrTimeout();
// Clean-up
syncPoints[0].Continue();
await abortTask.OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
});
}
[Fact]
public async Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress()
public Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(out var syncPoint));
try
return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(out var syncPoint))), async (connection, closed) =>
{
// Start the connection
await connection.StartAsync().OrTimeout();
@ -141,7 +113,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// (it will throw the abort exception)
syncPoint.Continue();
await abortTask.OrTimeout();
var actual = await Assert.ThrowsAsync<Exception>(() => closedTask.OrTimeout());
var actual = await Assert.ThrowsAsync<Exception>(() => closed.OrTimeout());
Assert.Same(expected, actual);
// We can start now
@ -149,126 +121,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// And we can stop without getting the abort exception.
await connection.StopAsync().OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
}
private HttpConnection CreateConnection(out Task closedTask, Func<Task> stopHandler = null)
{
var httpHandler = new TestHttpMessageHandler();
var transportFactory = new TestTransportFactory(new TestTransport(stopHandler));
var connection = new HttpConnection(
new Uri("http://fakeuri.org/"),
transportFactory,
NullLoggerFactory.Instance,
new HttpOptions()
{
HttpMessageHandler = httpHandler,
});
var closedTcs = new TaskCompletionSource<object>();
connection.Closed += ex =>
{
if (ex != null)
{
closedTcs.SetException(ex);
}
else
{
closedTcs.SetResult(null);
}
};
closedTask = closedTcs.Task;
return connection;
}
private class TestTransport : ITransport
{
private Channel<byte[], SendMessage> _application;
private readonly Func<Task> _stopHandler;
public TransferMode? Mode => TransferMode.Text;
public TestTransport(Func<Task> stopHandler)
{
_stopHandler = stopHandler ?? new Func<Task>(() => Task.CompletedTask);
}
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
_application = application;
return Task.CompletedTask;
}
public async Task StopAsync()
{
await _stopHandler();
_application.Writer.TryComplete();
}
}
// Possibly useful as a general-purpose async testing helper?
private class SyncPoint
{
private TaskCompletionSource<object> _atSyncPoint = new TaskCompletionSource<object>();
private TaskCompletionSource<object> _continueFromSyncPoint = new TaskCompletionSource<object>();
// Used by the test code to wait and continue
public Task WaitForSyncPoint() => _atSyncPoint.Task;
public void Continue() => _continueFromSyncPoint.TrySetResult(null);
// Used by the code under test to wait for the test code to release it.
public Task WaitToContinue()
{
_atSyncPoint.TrySetResult(null);
return _continueFromSyncPoint.Task;
}
public static Func<Task> Create(out SyncPoint syncPoint)
{
var handler = Create(1, out var syncPoints);
syncPoint = syncPoints[0];
return handler;
}
/// <summary>
/// Creates a re-entrant function that waits for sync points in sequence.
/// </summary>
/// <param name="count">The number of sync points to expect</param>
/// <param name="syncPoints">The <see cref="SyncPoint"/> objects that can be used to coordinate the sync point</param>
/// <returns></returns>
public static Func<Task> Create(int count, out SyncPoint[] syncPoints)
{
// Need to use a local so the closure can capture it. You can't use out vars in a closure.
var localSyncPoints = new SyncPoint[count];
for (var i = 0; i < count; i += 1)
{
localSyncPoints[i] = new SyncPoint();
}
syncPoints = localSyncPoints;
var counter = 0;
return () =>
{
if (counter >= localSyncPoints.Length)
{
return Task.CompletedTask;
}
else
{
var syncPoint = localSyncPoints[counter];
counter += 1;
return syncPoint.WaitToContinue();
}
};
}
});
}
}
}

View File

@ -0,0 +1,369 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging.Testing;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
public class ConnectionLifecycle : LoggedTest
{
public ConnectionLifecycle(ITestOutputHelper output) : base(output)
{
}
[Fact]
public async Task CannotStartRunningConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(CreateConnection(loggerFactory: loggerFactory), async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync().OrTimeout());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
});
}
}
[Fact]
public async Task CannotStartConnectionDisposedAfterStarting()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(loggerFactory: loggerFactory),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync().OrTimeout());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
});
}
}
[Fact]
public async Task CannotStartDisposedConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(loggerFactory: loggerFactory),
async (connection, closed) =>
{
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync().OrTimeout());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
});
}
}
[Fact]
public async Task CanDisposeStartingConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new TestTransport(
onTransportStart: SyncPoint.Create(out var transportStart),
onTransportStop: SyncPoint.Create(out var transportStop))),
async (connection, closed) =>
{
// Start the connection and wait for the transport to start up.
var startTask = connection.StartAsync();
await transportStart.WaitForSyncPoint().OrTimeout();
// While the transport is starting, dispose the connection
var disposeTask = connection.DisposeAsync();
transportStart.Continue(); // We need to release StartAsync, because Dispose waits for it.
// Wait for start to finish, as that has to finish before the transport will be stopped.
await startTask.OrTimeout();
// Then release DisposeAsync (via the transport StopAsync call)
await transportStop.WaitForSyncPoint().OrTimeout();
transportStop.Continue();
});
}
}
[Fact]
public async Task CanStartConnectionThatFailedToStart()
{
using (StartLog(out var loggerFactory))
{
var expected = new Exception("Transport failed to start");
var shouldFail = true;
Task OnTransportStart()
{
if (shouldFail)
{
// Succeed next time
shouldFail = false;
return Task.FromException(expected);
}
else
{
return Task.CompletedTask;
}
}
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new TestTransport(onTransportStart: OnTransportStart)),
async (connection, closed) =>
{
var actual = await Assert.ThrowsAsync<Exception>(() => connection.StartAsync());
Assert.Same(expected, actual);
// Should succeed this time
shouldFail = false;
await connection.StartAsync().OrTimeout();
});
}
}
[Fact]
public async Task CanStartStoppedConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(loggerFactory: loggerFactory),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.StopAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
});
}
}
[Fact]
public async Task CanStopStartingConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new TestTransport(onTransportStart: SyncPoint.Create(out var transportStart))),
async (connection, closed) =>
{
// Start and wait for the transport to start up.
var startTask = connection.StartAsync();
await transportStart.WaitForSyncPoint().OrTimeout();
// Stop the connection while it's starting
var stopTask = connection.StopAsync();
transportStart.Continue(); // We need to release Start in order for Stop to begin working.
// Wait for start to finish, which will allow stop to finish and the connection to close.
await startTask.OrTimeout();
await stopTask.OrTimeout();
await closed.OrTimeout();
});
}
}
[Fact]
public async Task StoppingStoppingConnectionNoOps()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(loggerFactory: loggerFactory),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await Task.WhenAll(connection.StopAsync(), connection.StopAsync()).OrTimeout();
await closed.OrTimeout();
});
}
}
[Fact]
public async Task CanStartConnectionAfterConnectionStoppedWithError()
{
using (StartLog(out var loggerFactory))
{
var httpHandler = new TestHttpMessageHandler();
var longPollResult = new TaskCompletionSource<HttpResponseMessage>();
httpHandler.OnLongPoll(cancellationToken => longPollResult.Task.OrTimeout());
httpHandler.OnSocketSend((data, _) =>
{
Assert.Collection(data, i => Assert.Equal(0x42, i));
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError));
});
await WithConnectionAsync(
CreateConnection(httpHandler, loggerFactory),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await Assert.ThrowsAsync<HttpRequestException>(() => connection.SendAsync(new byte[] { 0x42 }).OrTimeout());
longPollResult.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent));
// Wait for the connection to close, because the send failed.
await Assert.ThrowsAsync<HttpRequestException>(() => closed.OrTimeout());
// Start it up again
await connection.StartAsync().OrTimeout();
});
}
}
[Fact]
public async Task DisposedStoppingConnectionDisposesConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(
loggerFactory: loggerFactory,
transport: new TestTransport(onTransportStop: SyncPoint.Create(out var transportStop))),
async (connection, closed) =>
{
// Start the connection
await connection.StartAsync().OrTimeout();
// Stop the connection
var stopTask = connection.StopAsync().OrTimeout();
// Once the transport starts shutting down
await transportStop.WaitForSyncPoint();
// Start disposing and allow it to finish shutting down
var disposeTask = connection.DisposeAsync().OrTimeout();
transportStop.Continue();
// Wait for the tasks to complete
await stopTask.OrTimeout();
await closed.OrTimeout();
await disposeTask.OrTimeout();
// We should be disposed and thus unable to restart.
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.StartAsync().OrTimeout());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
});
}
}
[Fact]
public async Task CanDisposeStoppedConnection()
{
using (StartLog(out var loggerFactory))
{
await WithConnectionAsync(
CreateConnection(loggerFactory: loggerFactory),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.StopAsync().OrTimeout();
await closed.OrTimeout();
await connection.DisposeAsync().OrTimeout();
});
}
}
[Fact]
public Task ClosedEventRaisedWhenTheClientIsDisposed()
{
return WithConnectionAsync(
CreateConnection(),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
await closed.OrTimeout();
});
}
[Fact]
public async Task ConnectionClosedWhenTransportFails()
{
var testTransport = new TestTransport();
var expected = new Exception("Whoops!");
await WithConnectionAsync(
CreateConnection(transport: testTransport),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
testTransport.Application.Writer.TryComplete(expected);
var actual = await Assert.ThrowsAsync<Exception>(() => closed.OrTimeout());
Assert.Same(expected, actual);
var sendException = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync(new byte[0]).OrTimeout());
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", sendException.Message);
});
}
[Fact]
public Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{
return WithConnectionAsync(
CreateConnection(),
async (connection, closed) =>
{
await connection.DisposeAsync().OrTimeout();
Assert.False(closed.IsCompleted);
});
}
[Fact]
public async Task TransportIsStoppedWhenConnectionIsStopped()
{
var testHttpHandler = new TestHttpMessageHandler();
// Just keep returning data when polled
testHttpHandler.OnLongPoll(_ => ResponseUtils.CreateResponse(HttpStatusCode.OK));
using (var httpClient = new HttpClient(testHttpHandler))
{
var longPollingTransport = new LongPollingTransport(httpClient);
await WithConnectionAsync(
CreateConnection(transport: longPollingTransport),
async (connection, closed) =>
{
// Start the transport
await connection.StartAsync().OrTimeout();
Assert.False(longPollingTransport.Running.IsCompleted, "Expected that the transport would still be running");
// Stop the connection, and we should stop the transport
await connection.StopAsync().OrTimeout();
await longPollingTransport.Running.OrTimeout();
});
}
}
}
}
}

View File

@ -0,0 +1,131 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
private static HttpConnection CreateConnection(HttpMessageHandler httpHandler = null, ILoggerFactory loggerFactory = null, string url = null, ITransport transport = null)
{
loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
var httpOptions = new HttpOptions()
{
HttpMessageHandler = httpHandler ?? TestHttpMessageHandler.CreateDefault(),
};
var uri = new Uri(url ?? "http://fakeuri.org/");
var connection = (transport != null) ?
new HttpConnection(uri, new TestTransportFactory(transport), loggerFactory, httpOptions) :
new HttpConnection(uri, TransportType.LongPolling, loggerFactory, httpOptions);
return connection;
}
private static async Task WithConnectionAsync(HttpConnection connection, Func<HttpConnection, Task, Task> body)
{
try
{
var closedTcs = new TaskCompletionSource<object>();
connection.Closed += ex =>
{
if (ex != null)
{
closedTcs.SetException(ex);
}
else
{
closedTcs.SetResult(null);
}
};
// Using OrTimeout here will hide any timeout issues in the test :(.
await body(connection, closedTcs.Task);
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
// Possibly useful as a general-purpose async testing helper?
private class SyncPoint
{
private TaskCompletionSource<object> _atSyncPoint = new TaskCompletionSource<object>();
private TaskCompletionSource<object> _continueFromSyncPoint = new TaskCompletionSource<object>();
/// <summary>
/// Waits for the code-under-test to reach <see cref="WaitToContinue"/>.
/// </summary>
/// <returns></returns>
public Task WaitForSyncPoint() => _atSyncPoint.Task;
/// <summary>
/// Releases the code-under-test to continue past where it waited for <see cref="WaitToContinue"/>.
/// </summary>
public void Continue() => _continueFromSyncPoint.TrySetResult(null);
/// <summary>
/// Used by the code-under-test to wait for the test code to sync up.
/// </summary>
/// <remarks>
/// This code will unblock <see cref="WaitForSyncPoint"/> and then block waiting for <see cref="Continue"/> to be called.
/// </remarks>
/// <returns></returns>
public Task WaitToContinue()
{
_atSyncPoint.TrySetResult(null);
return _continueFromSyncPoint.Task;
}
public static Func<Task> Create(out SyncPoint syncPoint)
{
var handler = Create(1, out var syncPoints);
syncPoint = syncPoints[0];
return handler;
}
/// <summary>
/// Creates a re-entrant function that waits for sync points in sequence.
/// </summary>
/// <param name="count">The number of sync points to expect</param>
/// <param name="syncPoints">The <see cref="SyncPoint"/> objects that can be used to coordinate the sync point</param>
/// <returns></returns>
public static Func<Task> Create(int count, out SyncPoint[] syncPoints)
{
// Need to use a local so the closure can capture it. You can't use out vars in a closure.
var localSyncPoints = new SyncPoint[count];
for (var i = 0; i < count; i += 1)
{
localSyncPoints[i] = new SyncPoint();
}
syncPoints = localSyncPoints;
var counter = 0;
return () =>
{
if (counter >= localSyncPoints.Length)
{
return Task.CompletedTask;
}
else
{
var syncPoint = localSyncPoints[counter];
counter += 1;
return syncPoint.WaitToContinue();
}
};
}
}
}
}

View File

@ -0,0 +1,94 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Xunit;
using TransportType = Microsoft.AspNetCore.Sockets.TransportType;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
public class Negotiate
{
[Theory]
[InlineData("")]
[InlineData("Not Json")]
public Task StartThrowsFormatExceptionIfNegotiationResponseIsInvalid(string negotiatePayload)
{
return RunInvalidNegotiateResponseTest<FormatException>(negotiatePayload, "Invalid negotiation response received.");
}
[Fact]
public Task StartThrowsFormatExceptionIfNegotiationResponseHasNoConnectionId()
{
return RunInvalidNegotiateResponseTest<FormatException>(ResponseUtils.CreateNegotiationContent(connectionId: null), "Invalid connection id returned in negotiation response.");
}
[Fact]
public Task StartThrowsFormatExceptionIfNegotiationResponseHasNoTransports()
{
return RunInvalidNegotiateResponseTest<FormatException>(ResponseUtils.CreateNegotiationContent(transportTypes: null), "No transports returned in negotiation response.");
}
[Theory]
[InlineData((TransportType)0)]
[InlineData(TransportType.ServerSentEvents)]
public Task ConnectionCannotBeStartedIfNoCommonTransportsBetweenClientAndServer(TransportType serverTransports)
{
return RunInvalidNegotiateResponseTest<InvalidOperationException>(ResponseUtils.CreateNegotiationContent(transportTypes: serverTransports), "No requested transports available on the server.");
}
[Theory]
[InlineData("http://fakeuri.org/", "http://fakeuri.org/negotiate")]
[InlineData("http://fakeuri.org/?q=1/0", "http://fakeuri.org/negotiate?q=1/0")]
[InlineData("http://fakeuri.org?q=1/0", "http://fakeuri.org/negotiate?q=1/0")]
[InlineData("http://fakeuri.org/endpoint", "http://fakeuri.org/endpoint/negotiate")]
[InlineData("http://fakeuri.org/endpoint/", "http://fakeuri.org/endpoint/negotiate")]
[InlineData("http://fakeuri.org/endpoint?q=1/0", "http://fakeuri.org/endpoint/negotiate?q=1/0")]
public async Task CorrectlyHandlesQueryStringWhenAppendingNegotiateToUrl(string requestedUrl, string expectedNegotiate)
{
var testHttpHandler = new TestHttpMessageHandler(autoNegotiate: false);
var negotiateUrlTcs = new TaskCompletionSource<string>();
testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.NoContent));
testHttpHandler.OnNegotiate((request, cancellationToken) =>
{
negotiateUrlTcs.TrySetResult(request.RequestUri.ToString());
return ResponseUtils.CreateResponse(HttpStatusCode.OK,
ResponseUtils.CreateNegotiationContent());
});
await WithConnectionAsync(
CreateConnection(testHttpHandler, url: requestedUrl),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
});
Assert.Equal(expectedNegotiate, await negotiateUrlTcs.Task.OrTimeout());
}
private async Task RunInvalidNegotiateResponseTest<TException>(string negotiatePayload, string expectedExceptionMessage) where TException : Exception
{
var testHttpHandler = new TestHttpMessageHandler(autoNegotiate: false);
testHttpHandler.OnNegotiate((_, cancellationToken) => ResponseUtils.CreateResponse(HttpStatusCode.OK, negotiatePayload));
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
var exception = await Assert.ThrowsAsync<TException>(
() => connection.StartAsync().OrTimeout());
Assert.Equal(expectedExceptionMessage, exception.Message);
});
}
}
}
}

View File

@ -0,0 +1,107 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
public class OnReceived
{
[Fact]
public async Task CanReceiveData()
{
var testHttpHandler = new TestHttpMessageHandler();
testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42"));
testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
var receiveTcs = new TaskCompletionSource<string>();
connection.OnReceived((data, state) =>
{
var tcs = ((TaskCompletionSource<string>)state);
tcs.TrySetResult(Encoding.UTF8.GetString(data));
return Task.CompletedTask;
}, receiveTcs);
await connection.StartAsync().OrTimeout();
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
});
}
[Fact]
public async Task CanReceiveDataEvenIfExceptionThrownFromPreviousReceivedEvent()
{
var testHttpHandler = new TestHttpMessageHandler();
testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42"));
testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
var receiveTcs = new TaskCompletionSource<string>();
var receivedRaised = false;
connection.OnReceived((data, state) =>
{
if (!receivedRaised)
{
receivedRaised = true;
return Task.FromException(new InvalidOperationException());
}
receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
return Task.CompletedTask;
}, receiveTcs);
await connection.StartAsync().OrTimeout();
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
Assert.True(receivedRaised);
});
}
[Fact]
public async Task CanReceiveDataEvenIfExceptionThrownSynchronouslyFromPreviousReceivedEvent()
{
var testHttpHandler = new TestHttpMessageHandler();
testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42"));
testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
var receiveTcs = new TaskCompletionSource<string>();
var receivedRaised = false;
connection.OnReceived((data, state) =>
{
if (!receivedRaised)
{
receivedRaised = true;
throw new InvalidOperationException();
}
receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
return Task.CompletedTask;
}, receiveTcs);
await connection.StartAsync().OrTimeout();
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
Assert.True(receivedRaised);
});
}
}
}
}

View File

@ -0,0 +1,122 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
public class SendAsync
{
[Fact]
public async Task CanSendData()
{
var data = new byte[] { 1, 1, 2, 3, 5, 8 };
var testHttpHandler = new TestHttpMessageHandler();
var sendTcs = new TaskCompletionSource<byte[]>();
var longPollTcs = new TaskCompletionSource<HttpResponseMessage>();
testHttpHandler.OnLongPoll(cancellationToken => longPollTcs.Task);
testHttpHandler.OnSocketSend((buf, cancellationToken) =>
{
sendTcs.TrySetResult(buf);
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
});
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.SendAsync(data).OrTimeout();
Assert.Equal(data, await sendTcs.Task.OrTimeout());
longPollTcs.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent));
});
}
[Fact]
public Task SendThrowsIfConnectionIsNotStarted()
{
return WithConnectionAsync(
CreateConnection(),
async (connection, closed) =>
{
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => connection.SendAsync(new byte[0]).OrTimeout());
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message);
});
}
[Fact]
public Task SendThrowsIfConnectionIsStopped()
{
return WithConnectionAsync(
CreateConnection(),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.StopAsync().OrTimeout();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => connection.SendAsync(new byte[0]).OrTimeout());
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message);
});
}
[Fact]
public Task SendThrowsIfConnectionIsDisposed()
{
return WithConnectionAsync(
CreateConnection(),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => connection.SendAsync(new byte[0]).OrTimeout());
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message);
});
}
[Fact]
public async Task CallerReceivesExceptionsFromSendAsync()
{
var testHttpHandler = new TestHttpMessageHandler();
var longPollTcs = new TaskCompletionSource<HttpResponseMessage>();
testHttpHandler.OnLongPoll(cancellationToken => longPollTcs.Task);
testHttpHandler.OnSocketSend((buf, cancellationToken) =>
{
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError));
});
await WithConnectionAsync(
CreateConnection(testHttpHandler),
async (connection, closed) =>
{
await connection.StartAsync().OrTimeout();
var exception = await Assert.ThrowsAsync<HttpRequestException>(
async () => await connection.SendAsync(new byte[0]).OrTimeout());
longPollTcs.TrySetResult(null);
});
}
}
}
}

View File

@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Client.Tests
new UriBuilder(request.RequestUri).Path.EndsWith("/negotiate");
}
public static string CreateNegotiationResponse(string connectionId = "00000000-0000-0000-0000-000000000000",
public static string CreateNegotiationContent(string connectionId = "00000000-0000-0000-0000-000000000000",
SocketsTransportType? transportTypes = SocketsTransportType.All)
{
var sb = new StringBuilder("{ ");

View File

@ -9,17 +9,128 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public class TestHttpMessageHandler : HttpMessageHandler
{
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
private Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> _handler;
public TestHttpMessageHandler(bool autoNegotiate = true)
{
if (ResponseUtils.IsNegotiateRequest(request))
_handler = (request, cancellationToken) => BaseHandler(request, cancellationToken);
if (autoNegotiate)
{
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.OK,
ResponseUtils.CreateNegotiationResponse()));
OnNegotiate((_, cancellationToken) => ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationContent()));
}
else
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
await Task.Yield();
return await _handler(request, cancellationToken);
}
public static HttpMessageHandler CreateDefault()
{
var testHttpMessageHandler = new TestHttpMessageHandler();
testHttpMessageHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
testHttpMessageHandler.OnLongPoll(async cancellationToken =>
{
return Task.FromException<HttpResponseMessage>(new InvalidOperationException($"Http endpoint not implemented: {request.RequestUri}"));
}
// Just block until canceled
var tcs = new TaskCompletionSource<object>();
using (cancellationToken.Register(() => tcs.TrySetResult(null)))
{
await tcs.Task;
}
return ResponseUtils.CreateResponse(HttpStatusCode.NoContent);
});
return testHttpMessageHandler;
}
public void OnRequest(Func<HttpRequestMessage, Func<Task<HttpResponseMessage>>, CancellationToken, Task<HttpResponseMessage>> handler)
{
var nextHandler = _handler;
_handler = (request, cancellationToken) => handler(request, () => nextHandler(request, cancellationToken), cancellationToken);
}
public void OnGet(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Get, pathAndQuery, handler);
public void OnPost(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Post, pathAndQuery, handler);
public void OnPut(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Put, pathAndQuery, handler);
public void OnDelete(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Delete, pathAndQuery, handler);
public void OnHead(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Head, pathAndQuery, handler);
public void OnOptions(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Options, pathAndQuery, handler);
public void OnTrace(string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler) => OnRequest(HttpMethod.Trace, pathAndQuery, handler);
public void OnRequest(HttpMethod method, string pathAndQuery, Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler)
{
OnRequest((request, next, cancellationToken) =>
{
if (request.Method.Equals(method) && string.Equals(request.RequestUri.PathAndQuery, pathAndQuery))
{
return handler(request, cancellationToken);
}
else
{
return next();
}
});
}
public void OnNegotiate(Func<HttpRequestMessage, CancellationToken, HttpResponseMessage> handler) => OnNegotiate((req, cancellationToken) => Task.FromResult(handler(req, cancellationToken)));
public void OnNegotiate(Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> handler)
{
OnRequest((request, next, cancellationToken) =>
{
if (ResponseUtils.IsNegotiateRequest(request))
{
return handler(request, cancellationToken);
}
else
{
return next();
}
});
}
public void OnLongPoll(Func<CancellationToken, HttpResponseMessage> handler) => OnLongPoll(cancellationToken => Task.FromResult(handler(cancellationToken)));
public void OnLongPoll(Func<CancellationToken, Task<HttpResponseMessage>> handler)
{
OnRequest((request, next, cancellationToken) =>
{
if (request.Method.Equals(HttpMethod.Get) && request.RequestUri.PathAndQuery.StartsWith("/?id="))
{
return handler(cancellationToken);
}
else
{
return next();
}
});
}
public void OnSocketSend(Func<byte[], CancellationToken, HttpResponseMessage> handler) => OnSocketSend((data, cancellationToken) => Task.FromResult(handler(data, cancellationToken)));
public void OnSocketSend(Func<byte[], CancellationToken, Task<HttpResponseMessage>> handler)
{
OnRequest(async (request, next, cancellationToken) =>
{
if (request.Method.Equals(HttpMethod.Post) && request.RequestUri.PathAndQuery.StartsWith("/?id="))
{
var data = await request.Content.ReadAsByteArrayAsync();
return await handler(data, cancellationToken);
}
else
{
return await next();
}
});
}
private Task<HttpResponseMessage> BaseHandler(HttpRequestMessage request, CancellationToken cancellationToken)
{
return Task.FromException<HttpResponseMessage>(new InvalidOperationException($"Http endpoint not implemented: {request.Method} {request.RequestUri}"));
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public class TestTransport : ITransport
{
private readonly Func<Task> _stopHandler;
private readonly Func<Task> _startHandler;
public TransferMode? Mode { get; }
public Channel<byte[], SendMessage> Application { get; private set; }
public TestTransport(Func<Task> onTransportStop = null, Func<Task> onTransportStart = null, TransferMode transferMode = TransferMode.Text)
{
_stopHandler = onTransportStop ?? new Func<Task>(() => Task.CompletedTask);
_startHandler = onTransportStart ?? new Func<Task>(() => Task.CompletedTask);
Mode = transferMode;
}
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
Application = application;
return _startHandler();
}
public async Task StopAsync()
{
await _stopHandler();
Application.Writer.TryComplete();
}
}
}

View File

@ -1,10 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net.Http;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Client.Tests
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public class TestTransportFactory : ITransportFactory
{
@ -20,4 +22,4 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return _transport;
}
}
}
}

View File

@ -5,10 +5,12 @@ using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Logging.Testing;
namespace Microsoft.AspNetCore.SignalR.Tests
@ -21,6 +23,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
private IWebHost _host;
private IApplicationLifetime _lifetime;
private readonly IDisposable _logToken;
private AsyncForwardingLoggerProvider _asyncLoggerProvider;
public string WebSocketsUrl => Url.Replace("http", "ws");
@ -28,14 +31,22 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public ServerFixture()
{
_asyncLoggerProvider = new AsyncForwardingLoggerProvider();
var testLog = AssemblyTestLog.ForAssembly(typeof(TStartup).Assembly);
_logToken = testLog.StartTestLog(null, $"{nameof(ServerFixture<TStartup>)}_{typeof(TStartup).Name}", out _loggerFactory, "ServerFixture");
_loggerFactory.AddProvider(_asyncLoggerProvider);
_logger = _loggerFactory.CreateLogger<ServerFixture<TStartup>>();
Url = "http://localhost:" + GetNextPort();
StartServer(Url);
}
public void SetTestLoggerFactory(ILoggerFactory loggerFactory)
{
_asyncLoggerProvider.SetLoggerFactory(loggerFactory);
}
private void StartServer(string url)
{
_host = new WebHostBuilder()
@ -74,6 +85,62 @@ namespace Microsoft.AspNetCore.SignalR.Tests
_loggerFactory.Dispose();
}
private class AsyncForwardingLoggerProvider : ILoggerProvider
{
private AsyncLocal<ILoggerFactory> _localLogger = new AsyncLocal<ILoggerFactory>();
public ILogger CreateLogger(string categoryName)
{
return new AsyncLocalForwardingLogger(categoryName, _localLogger);
}
public void Dispose()
{
}
public void SetLoggerFactory(ILoggerFactory loggerFactory)
{
_localLogger.Value = loggerFactory;
}
private class AsyncLocalForwardingLogger : ILogger
{
private string _categoryName;
private AsyncLocal<ILoggerFactory> _localLoggerFactory;
public AsyncLocalForwardingLogger(string categoryName, AsyncLocal<ILoggerFactory> localLoggerFactory)
{
_categoryName = categoryName;
_localLoggerFactory = localLoggerFactory;
}
public IDisposable BeginScope<TState>(TState state)
{
return GetLocalLogger().BeginScope(state);
}
public bool IsEnabled(LogLevel logLevel)
{
return GetLocalLogger().IsEnabled(logLevel);
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
GetLocalLogger().Log(logLevel, eventId, state, exception, formatter);
}
private ILogger GetLocalLogger()
{
var factory = _localLoggerFactory.Value;
if (factory == null)
{
return NullLogger.Instance;
}
return factory.CreateLogger(_categoryName);
}
}
}
private class ForwardingLoggerProvider : ILoggerProvider
{
private readonly ILoggerFactory _loggerFactory;

View File

@ -114,7 +114,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
.Returns<HttpRequestMessage, CancellationToken>(
(request, cancellationToken) => Task.FromException<HttpResponseMessage>(new InvalidOperationException("HTTP requests should not be sent.")));
var connection = new HttpConnection(new Uri(url), TransportType.WebSockets, loggerFactory, new HttpOptions { HttpMessageHandler = mockHttpHandler.Object});
var connection = new HttpConnection(new Uri(url), TransportType.WebSockets, loggerFactory, new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
try
{
@ -326,6 +326,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
using (StartLog(out var loggerFactory, testName: $"ConnectionCanSendAndReceiveMessages_{transportType.ToString()}"))
{
_serverFixture.SetTestLoggerFactory(loggerFactory);
var logger = loggerFactory.CreateLogger<EndToEndTests>();
var url = _serverFixture.Url + "/uncreatable";
@ -351,13 +353,24 @@ namespace Microsoft.AspNetCore.SignalR.Tests
};
logger.LogInformation("Starting connection to {url}", url);
await connection.StartAsync().OrTimeout();
try
{
await connection.StartAsync().OrTimeout();
}
catch (OperationCanceledException)
{
// Due to a race, this can fail with OperationCanceledException in the SendAsync
// call that HubConnection does to send the negotiate message.
// This has only been happening on AppVeyor, likely due to a slower CI machine
// The closed event will still fire with the exception we care about.
}
await closeTcs.Task.OrTimeout();
}
catch (Exception ex)
{
logger.LogInformation(ex, "Test threw exception");
logger.LogError(ex, "Test threw {exceptionType}: {message}", ex.GetType(), ex.Message);
throw;
}
finally

View File

@ -1,4 +1,4 @@
{
"longRunningTestSeconds": 5,
"diagnosticMessages": false
}
"diagnosticMessages": true
}