// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Diagnostics; using System.Net; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Client.Tests; using Microsoft.AspNetCore.Sockets.Client.Http; using Microsoft.AspNetCore.Sockets.Features; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Testing; using Moq; using Moq.Protected; using Xunit; using Xunit.Abstractions; namespace Microsoft.AspNetCore.Sockets.Client.Tests { public partial class HttpConnectionTests : LoggedTest { public HttpConnectionTests(ITestOutputHelper output) : base(output) { } [Fact] public void CannotCreateConnectionWithNullUrl() { var exception = Assert.Throws(() => new HttpConnection(null)); Assert.Equal("url", exception.ParamName); } [Fact] public void ConnectionReturnsUrlUsedToStartTheConnection() { var connectionUrl = new Uri("http://fakeuri.org/"); Assert.Equal(connectionUrl, new HttpConnection(connectionUrl).Url); } [Theory] [InlineData((TransportType)0)] [InlineData(TransportType.All + 1)] public void CannotStartConnectionWithInvalidTransportType(TransportType requestedTransportType) { Assert.Throws( () => new HttpConnection(new Uri("http://fakeuri.org/"), requestedTransportType)); } [Fact] public async Task CannotStartRunningConnection() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { await connection.StartAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.StartAsync()); Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); } finally { await connection.DisposeAsync(); } } [Fact] public async Task CannotStartConnectionDisposedAfterStarting() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync(); await connection.DisposeAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.StartAsync()); Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); } [Fact] public async Task CannotStartDisposedConnection() { using (var httpClient = new HttpClient()) { var connection = new HttpConnection(new Uri("http://fakeuri.org/")); await connection.DisposeAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.StartAsync()); Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); } } [Fact] public async Task CanDisposeStartingConnection() { // Used to make sure StartAsync is not completed before DisposeAsync is called var releaseNegotiateTcs = new TaskCompletionSource(); // Used to make sure that DisposeAsync runs after we check the state in StartAsync var allowDisposeTcs = new TaskCompletionSource(); // Used to make sure that DisposeAsync continues only after StartAsync finished var releaseDisposeTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); // allow DisposeAsync to continue once we know we are past the connection state check allowDisposeTcs.SetResult(null); await releaseNegotiateTcs.Task; return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var transport = new Mock(); Channel channel = null; transport.SetupGet(t => t.Mode).Returns(TransferMode.Text); transport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns, TransferMode, string, IConnection>((_, c, __, ___, ____) => { channel = c; return Task.CompletedTask; }); transport.Setup(t => t.StopAsync()).Returns(async () => { await releaseDisposeTcs.Task; channel.Writer.TryComplete(); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(transport.Object), loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var startTask = connection.StartAsync(); await allowDisposeTcs.Task; var disposeTask = connection.DisposeAsync(); // allow StartAsync to continue once DisposeAsync has started releaseNegotiateTcs.SetResult(null); // unblock DisposeAsync only after StartAsync completed await startTask.OrTimeout(); releaseDisposeTcs.SetResult(null); await disposeTask.OrTimeout(); } [Fact] public async Task CanStartConnectionThatFailedToStart() { var failNegotiate = true; var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { return failNegotiate ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { await connection.StartAsync().OrTimeout(); } catch { } failNegotiate = false; await connection.StartAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); } [Fact] public async Task CanStartStoppedConnection() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync().OrTimeout(); await connection.StopAsync().OrTimeout(); await connection.StartAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); } [Fact] public async Task CanStopStartingConnection() { var allowStopTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { allowStopTcs.SetResult(null); return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } else { var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; return (content?.Length == 1 && content[0] == 0x42) ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK); } }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => closeTcs.TrySetResult(null); var startTask = connection.StartAsync(); await allowStopTcs.Task.OrTimeout(); await Task.WhenAll(startTask, connection.StopAsync()).OrTimeout(); await closeTcs.Task.OrTimeout(); } [Fact] public async Task CanStartConnectionAfterConnectionStoppedWithError() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; return (content?.Length == 1 && content[0] == 0x42) ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => closeTcs.TrySetResult(null); await connection.StartAsync().OrTimeout(); try { await connection.SendAsync(new byte[] { 0x42 }).OrTimeout(); } catch { } await closeTcs.Task.OrTimeout(); await connection.StartAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); } [Fact] public async Task CanDisposeStoppedConnection() { var connection = new HttpConnection(new Uri("http://fakeuri.org/")); await connection.StopAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); } [Fact] public async Task StoppingStoppingConnectionNoOps() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } else { var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; return (content?.Length == 1 && content[0] == 0x42) ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK); } }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => closeTcs.TrySetResult(null); await connection.StartAsync().OrTimeout(); await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.StopAsync().OrTimeout()); await closeTcs.Task.OrTimeout(); } [Fact] public async Task DisposedStoppingConnectionDisposesConnection() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } else { var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; return (content?.Length == 1 && content[0] == 0x42) ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK); } }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => closeTcs.TrySetResult(null); await connection.StartAsync().OrTimeout(); await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.DisposeAsync().OrTimeout()); await closeTcs.Task.OrTimeout(); var exception = await Assert.ThrowsAsync(() => connection.StartAsync()); Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); } [Fact] public async Task SendThrowsIfConnectionIsNotStarted() { var connection = new HttpConnection(new Uri("http://fakeuri.org/")); var exception = await Assert.ThrowsAsync( async () => await connection.SendAsync(new byte[0])); Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } [Fact] public async Task SendThrowsIfConnectionIsDisposed() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync(); await connection.DisposeAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.SendAsync(new byte[0])); Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } [Fact] public async Task ClosedEventRaisedWhenTheClientIsBeingStopped() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => { if (e != null) { closeTcs.SetException(e); } else { closeTcs.SetResult(null); } }; await connection.StartAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); await closeTcs.Task.OrTimeout(); } [Fact] public async Task ClosedEventRaisedWhenConnectionToServerLost() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return request.Method == HttpMethod.Get ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var closeTcs = new TaskCompletionSource(); connection.Closed += e => { if (e != null) { closeTcs.SetException(e); } else { closeTcs.SetResult(null); } }; try { await connection.StartAsync().OrTimeout(); await Assert.ThrowsAsync(() => closeTcs.Task.OrTimeout()); } finally { await connection.DisposeAsync(); } } [Fact] public async Task EventsAreNotRunningOnMainLoop() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var mockTransport = new Mock(); Channel channel = null; mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => { channel = c; return Task.CompletedTask; }); mockTransport.Setup(t => t.StopAsync()) .Returns(() => { channel.Writer.TryComplete(); return Task.CompletedTask; }); mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); var callbackInvokedTcs = new TaskCompletionSource(); var closedTcs = new TaskCompletionSource(); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); connection.OnReceived(_ => { callbackInvokedTcs.SetResult(null); return closedTcs.Task; }); await connection.StartAsync(); channel.Writer.TryWrite(Array.Empty()); // Ensure that the Received callback has been called before attempting the second write await callbackInvokedTcs.Task.OrTimeout(); channel.Writer.TryWrite(Array.Empty()); // Ensure that SignalR isn't blocked by the receive callback Assert.False(channel.Reader.TryRead(out var message)); closedTcs.SetResult(null); await connection.DisposeAsync(); } [Fact] public async Task EventQueueTimeout() { using (StartLog(out var loggerFactory)) { var logger = loggerFactory.CreateLogger(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var mockTransport = new Mock(); Channel channel = null; mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => { logger.LogInformation("Transport started"); channel = c; return Task.CompletedTask; }); mockTransport.Setup(t => t.StopAsync()) .Returns(() => { logger.LogInformation("Transport stopped"); channel.Writer.TryComplete(); return Task.CompletedTask; }); mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); var blockReceiveCallbackTcs = new TaskCompletionSource(); var onReceivedCalledTcs = new TaskCompletionSource(); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); connection.OnReceived(async _ => { onReceivedCalledTcs.TrySetResult(null); await blockReceiveCallbackTcs.Task; }); logger.LogInformation("Starting connection"); await connection.StartAsync().OrTimeout(); logger.LogInformation("Started connection"); channel.Writer.TryWrite(Array.Empty()); await onReceivedCalledTcs.Task.OrTimeout(); // Ensure that SignalR isn't blocked by the receive callback Assert.False(channel.Reader.TryRead(out var message)); logger.LogInformation("Disposing connection"); await connection.DisposeAsync().OrTimeout(TimeSpan.FromSeconds(10)); logger.LogInformation("Disposed connection"); } } [Fact] public async Task EventQueueTimeoutWithException() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var mockTransport = new Mock(); Channel channel = null; mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => { channel = c; return Task.CompletedTask; }); mockTransport.Setup(t => t.StopAsync()) .Returns(() => { channel.Writer.TryComplete(); return Task.CompletedTask; }); mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); var callbackInvokedTcs = new TaskCompletionSource(); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); connection.OnReceived(_ => { throw new OperationCanceledException(); }); await connection.StartAsync(); channel.Writer.TryWrite(Array.Empty()); // Ensure that SignalR isn't blocked by the receive callback Assert.False(channel.Reader.TryRead(out var message)); await connection.DisposeAsync(); } [Fact] public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted() { var connection = new HttpConnection(new Uri("http://fakeuri.org/")); var closeInvoked = false; connection.Closed += e => closeInvoked = true; await connection.DisposeAsync(); Assert.False(closeInvoked); } [Fact] public async Task TransportIsStoppedWhenConnectionIsStopped() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient, null, new LoggerFactory()); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(longPollingTransport), loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { await connection.StartAsync(); Assert.False(longPollingTransport.Running.IsCompleted); } finally { await connection.DisposeAsync(); } await longPollingTransport.Running.OrTimeout(); } } [Fact] public async Task CanSendData() { var data = new byte[] { 1, 1, 2, 3, 5, 8 }; var sendTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (ResponseUtils.IsNegotiateRequest(request)) { return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); } if (request.Method == HttpMethod.Post) { sendTcs.SetResult(await request.Content.ReadAsByteArrayAsync()); } return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { await connection.StartAsync(); await connection.SendAsync(data); Assert.Equal(data, await sendTcs.Task.OrTimeout()); } finally { await connection.DisposeAsync(); } } [Fact] public async Task SendAsyncThrowsIfConnectionIsNotStarted() { var connection = new HttpConnection(new Uri("http://fakeuri.org/")); var exception = await Assert.ThrowsAsync( async () => await connection.SendAsync(new byte[0])); Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } [Fact] public async Task SendAsyncThrowsIfConnectionIsDisposed() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); var content = string.Empty; if (request.Method == HttpMethod.Get) { content = "T2:T:42;"; } return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync(); await connection.DisposeAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.SendAsync(new byte[0])); Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } [Fact] public async Task CallerReceivesExceptionsFromSendAsync() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : request.Method == HttpMethod.Post ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync(); var exception = await Assert.ThrowsAsync( async () => await connection.SendAsync(new byte[0])); await connection.DisposeAsync(); } [Fact] public async Task CanReceiveData() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); var content = string.Empty; if (request.Method == HttpMethod.Get) { content = "42"; } return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { var receiveTcs = new TaskCompletionSource(); connection.OnReceived((data, state) => { var tcs = ((TaskCompletionSource)state); tcs.TrySetResult(Encoding.UTF8.GetString(data)); return Task.CompletedTask; }, receiveTcs); connection.Closed += e => { if (e != null) { receiveTcs.TrySetException(e); } else { receiveTcs.TrySetCanceled(); } }; await connection.StartAsync().OrTimeout(); Assert.Equal("42", await receiveTcs.Task.OrTimeout()); } finally { await connection.DisposeAsync(); } } [Fact] public async Task CanReceiveDataEvenIfExceptionThrownFromPreviousReceivedEvent() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); var content = string.Empty; if (request.Method == HttpMethod.Get) { content = "42"; } return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { var receiveTcs = new TaskCompletionSource(); var receivedRaised = false; connection.OnReceived(data => { if (!receivedRaised) { receivedRaised = true; return Task.FromException(new InvalidOperationException()); } receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); return Task.CompletedTask; }); connection.Closed += e => { if (e != null) { receiveTcs.TrySetException(e); } else { receiveTcs.TrySetCanceled(); } }; await connection.StartAsync(); Assert.Equal("42", await receiveTcs.Task.OrTimeout()); } finally { await connection.DisposeAsync(); } } [Fact] public async Task CanReceiveDataEvenIfExceptionThrownSynchronouslyFromPreviousReceivedEvent() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); var content = string.Empty; if (request.Method == HttpMethod.Get) { content = "42"; } return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { var receiveTcs = new TaskCompletionSource(); var receivedRaised = false; connection.OnReceived((data) => { if (!receivedRaised) { receivedRaised = true; throw new InvalidOperationException(); } receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); return Task.CompletedTask; }); connection.Closed += e => { if (e != null) { receiveTcs.TrySetException(e); } else { receiveTcs.TrySetCanceled(); } }; await connection.StartAsync(); Assert.Equal("42", await receiveTcs.Task.OrTimeout()); } finally { await connection.DisposeAsync(); } } [Fact] public async Task CannotSendAfterReceiveThrewException() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return request.Method == HttpMethod.Get ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) : ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { var closeTcs = new TaskCompletionSource(); connection.Closed += e => { if (e != null) { closeTcs.SetException(e); } else { closeTcs.SetResult(null); } }; await connection.StartAsync().OrTimeout(); await Assert.ThrowsAsync(() => closeTcs.Task.OrTimeout()); var exception = await Assert.ThrowsAsync(() => connection.SendAsync(new byte[0])); Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); } finally { await connection.DisposeAsync(); } } [Theory] [InlineData("")] [InlineData("Not Json")] public async Task StartThrowsFormatExceptionIfNegotiationResponseIsInvalid(string negotiatePayload) { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK, negotiatePayload); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var exception = await Assert.ThrowsAsync( () => connection.StartAsync()); Assert.Equal("Invalid negotiation response received.", exception.Message); } [Fact] public async Task StartThrowsFormatExceptionIfNegotiationResponseHasNoConnectionId() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse(connectionId: null)); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var exception = await Assert.ThrowsAsync( () => connection.StartAsync()); Assert.Equal("Invalid connection id returned in negotiation response.", exception.Message); } [Fact] public async Task StartThrowsFormatExceptionIfNegotiationResponseHasNoTransports() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse(transportTypes: null)); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var exception = await Assert.ThrowsAsync( () => connection.StartAsync()); Assert.Equal("No transports returned in negotiation response.", exception.Message); } [Theory] [InlineData((TransportType)0)] [InlineData(TransportType.ServerSentEvents)] public async Task ConnectionCannotBeStartedIfNoCommonTransportsBetweenClientAndServer(TransportType serverTransports) { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse(transportTypes: serverTransports)); }); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); var exception = await Assert.ThrowsAsync( () => connection.StartAsync()); Assert.Equal("No requested transports available on the server.", exception.Message); } [Fact] public async Task CanStartConnectionWithoutSettingTransferModeFeature() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.IsNegotiateRequest(request) ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) : ResponseUtils.CreateResponse(HttpStatusCode.OK); }); var mockTransport = new Mock(); Channel channel = null; mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => { channel = c; return Task.CompletedTask; }); mockTransport.Setup(t => t.StopAsync()) .Returns(() => { channel.Writer.TryComplete(); return Task.CompletedTask; }); mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Binary); var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync().OrTimeout(); var transferModeFeature = connection.Features.Get(); await connection.DisposeAsync().OrTimeout(); mockTransport.Verify(t => t.StartAsync( It.IsAny(), It.IsAny>(), TransferMode.Text, It.IsAny(), It.IsAny()), Times.Once); Assert.NotNull(transferModeFeature); Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode); } [Theory] [InlineData("http://fakeuri.org/", "http://fakeuri.org/negotiate")] [InlineData("http://fakeuri.org/?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] [InlineData("http://fakeuri.org?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] [InlineData("http://fakeuri.org/endpoint", "http://fakeuri.org/endpoint/negotiate")] [InlineData("http://fakeuri.org/endpoint/", "http://fakeuri.org/endpoint/negotiate")] [InlineData("http://fakeuri.org/endpoint?q=1/0", "http://fakeuri.org/endpoint/negotiate?q=1/0")] public async Task CorrectlyHandlesQueryStringWhenAppendingNegotiateToUrl(string requested, string expectedNegotiate) { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); Assert.Equal(expectedNegotiate, request.RequestUri.ToString()); return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); }); var connection = new HttpConnection(new Uri(requested), TransportType.LongPolling, loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); await connection.StartAsync().OrTimeout(); await connection.DisposeAsync().OrTimeout(); } } }