// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Buffers; using System.Collections.Generic; using System.IO.Pipelines; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Reflection; using System.Runtime.InteropServices.ComTypes; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections.Client; using Microsoft.AspNetCore.Http.Connections.Client.Internal; using Microsoft.AspNetCore.SignalR.Tests; using Moq; using Moq.Protected; using Xunit; using Xunit.Abstractions; namespace Microsoft.AspNetCore.SignalR.Client.Tests { public class LongPollingTransportTests : VerifiableLoggedTest { private static readonly Uri TestUri = new Uri("http://example.com/?id=1234"); public LongPollingTransportTests(ITestOutputHelper output) : base(output) { } [Fact] public async Task LongPollingTransportStopsPollAndSendLoopsWhenTransportStopped() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); Task transportActiveTask; using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); transportActiveTask = longPollingTransport.Running; Assert.False(transportActiveTask.IsCompleted); } finally { await longPollingTransport.StopAsync(); } await transportActiveTask.OrTimeout(); } } [Fact] public async Task LongPollingTransportStopsWhenPollReceives204() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.NoContent); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); await longPollingTransport.Running.OrTimeout(); Assert.True(longPollingTransport.Input.TryRead(out var result)); Assert.True(result.IsCompleted); longPollingTransport.Input.AdvanceTo(result.Buffer.End); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportResponseWithNoContentDoesNotStopPoll() { var requests = 0; var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (requests == 0) { requests++; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } else if (requests == 1) { requests++; return ResponseUtils.CreateResponse(HttpStatusCode.OK, "Hello"); } else if (requests == 2) { requests++; // Time out return ResponseUtils.CreateResponse(HttpStatusCode.OK); } else if (requests == 3) { requests++; return ResponseUtils.CreateResponse(HttpStatusCode.OK, "World"); } // Done return ResponseUtils.CreateResponse(HttpStatusCode.NoContent); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); var data = await longPollingTransport.Input.ReadAllAsync().OrTimeout(); await longPollingTransport.Running.OrTimeout(); Assert.Equal(Encoding.UTF8.GetBytes("HelloWorld"), data); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportStartAsyncFailsIfFirstRequestFails() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { var exception = await Assert.ThrowsAsync(() => longPollingTransport.StartAsync(TestUri, TransferFormat.Binary)); Assert.Contains(" 500 ", exception.Message); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportStopsWhenPollRequestFails() { var mockHttpHandler = new Mock(); var firstPoll = true; mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (firstPoll) { firstPoll = false; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } return ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); var exception = await Assert.ThrowsAsync(async () => { async Task ReadAsync() { await longPollingTransport.Input.ReadAsync(); } await ReadAsync().OrTimeout(); }); Assert.Contains(" 500 ", exception.Message); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task StopTransportWhenConnectionAlreadyStoppedOnServer() { var pollRequestTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); var firstPoll = true; mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (request.Method == HttpMethod.Delete) { // Simulate the server having already cleaned up the connection on the server return ResponseUtils.CreateResponse(HttpStatusCode.NotFound); } else { if (firstPoll) { firstPoll = false; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } await pollRequestTcs.Task; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } }); using (StartVerifiableLog(out var loggerFactory)) { using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient, loggerFactory); await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary).OrTimeout(); var stopTask = longPollingTransport.StopAsync(); pollRequestTcs.SetResult(null); await stopTask.OrTimeout(); } } } [Fact] public async Task LongPollingTransportStopsWhenSendRequestFails() { var stopped = false; var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); switch (request.Method.Method) { case "DELETE": stopped = true; return ResponseUtils.CreateResponse(HttpStatusCode.Accepted); case "GET" when stopped: return ResponseUtils.CreateResponse(HttpStatusCode.NoContent); case "GET": return ResponseUtils.CreateResponse(HttpStatusCode.OK); case "POST": return ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError); default: throw new InvalidOperationException("Unexpected request"); } }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); await longPollingTransport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World")); await longPollingTransport.Running.OrTimeout(); var exception = await Assert.ThrowsAsync(async () => await longPollingTransport.Input.ReadAllAsync().OrTimeout()); Assert.Contains(" 500 ", exception.Message); Assert.True(stopped); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportShutsDownWhenChannelIsClosed() { var mockHttpHandler = new Mock(); var stopped = false; mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (request.Method == HttpMethod.Delete) { stopped = true; return ResponseUtils.CreateResponse(HttpStatusCode.Accepted); } else { return stopped ? ResponseUtils.CreateResponse(HttpStatusCode.NoContent) : ResponseUtils.CreateResponse(HttpStatusCode.OK); } }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); longPollingTransport.Output.Complete(); await longPollingTransport.Running.OrTimeout(); await longPollingTransport.Input.ReadAllAsync().OrTimeout(); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportShutsDownImmediatelyEvenIfServerDoesntCompletePoll() { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); longPollingTransport.Output.Complete(); await longPollingTransport.Running.OrTimeout(); await longPollingTransport.Input.ReadAllAsync().OrTimeout(); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportDispatchesMessagesReceivedFromPoll() { var message1Payload = new[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o' }; var requests = 0; var mockHttpHandler = new Mock(); var sentRequests = new List(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { sentRequests.Add(request); await Task.Yield(); if (requests == 0) { requests++; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } else if (requests == 1) { requests++; return ResponseUtils.CreateResponse(HttpStatusCode.OK, message1Payload); } return ResponseUtils.CreateResponse(HttpStatusCode.NoContent); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { // Start the transport await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); // Wait for the transport to finish await longPollingTransport.Running.OrTimeout(); // Pull Messages out of the channel var message = await longPollingTransport.Input.ReadAllAsync(); // Check the provided request Assert.Equal(3, sentRequests.Count); // Check the messages received Assert.Equal(message1Payload, message); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportSendsAvailableMessagesWhenTheyArrive() { var sentRequests = new List(); var tcs = new TaskCompletionSource(); var firstPoll = true; var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (request.Method == HttpMethod.Post) { // Build a new request object, but convert the entire payload to string sentRequests.Add(await request.Content.ReadAsByteArrayAsync()); } else if (request.Method == HttpMethod.Get) { // First poll completes immediately if (firstPoll) { firstPoll = false; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); // This is the poll task return await tcs.Task; } else if (request.Method == HttpMethod.Delete) { return ResponseUtils.CreateResponse(HttpStatusCode.Accepted); } return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { // Start the transport await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); longPollingTransport.Output.Write(Encoding.UTF8.GetBytes("Hello")); longPollingTransport.Output.Write(Encoding.UTF8.GetBytes("World")); await longPollingTransport.Output.FlushAsync(); longPollingTransport.Output.Complete(); await longPollingTransport.Running.OrTimeout(); await longPollingTransport.Input.ReadAllAsync(); Assert.Single(sentRequests); Assert.Equal(new[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d' }, sentRequests[0]); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task LongPollingTransportSendsDeleteAfterPollEnds() { var sentRequests = new List(); var pollTcs = new TaskCompletionSource(); var deleteTcs = new TaskCompletionSource(); var firstPoll = true; var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (request.Method == HttpMethod.Post) { // Build a new request object, but convert the entire payload to string sentRequests.Add(await request.Content.ReadAsByteArrayAsync()); } else if (request.Method == HttpMethod.Get) { // First poll completes immediately if (firstPoll) { firstPoll = false; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } cancellationToken.Register(() => pollTcs.TrySetCanceled(cancellationToken)); // This is the poll task return await pollTcs.Task; } else if (request.Method == HttpMethod.Delete) { // The poll task should have been completed Assert.True(pollTcs.Task.IsCompleted); deleteTcs.TrySetResult(null); return ResponseUtils.CreateResponse(HttpStatusCode.Accepted); } return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); // Start the transport await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); var task = longPollingTransport.StopAsync(); await deleteTcs.Task.OrTimeout(); await task.OrTimeout(); } } [Theory] [InlineData(TransferFormat.Binary)] [InlineData(TransferFormat.Text)] public async Task LongPollingTransportSetsTransferFormat(TransferFormat transferFormat) { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, transferFormat); } finally { await longPollingTransport.StopAsync(); } } } [Theory] [InlineData(TransferFormat.Text | TransferFormat.Binary)] // Multiple values not allowed [InlineData((TransferFormat)42)] // Unexpected value public async Task LongPollingTransportThrowsForInvalidTransferFormat(TransferFormat transferFormat) { var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); var exception = await Assert.ThrowsAsync(() => longPollingTransport.StartAsync(TestUri, transferFormat)); Assert.Contains($"The '{transferFormat}' transfer format is not supported by this transport.", exception.Message); Assert.Equal("transferFormat", exception.ParamName); } } [Fact] public async Task LongPollingTransportRePollsIfRequestCanceled() { var numPolls = 0; var completionTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); if (numPolls == 0) { numPolls++; return ResponseUtils.CreateResponse(HttpStatusCode.OK); } if (numPolls++ < 3) { throw new OperationCanceledException(); } completionTcs.SetResult(null); return ResponseUtils.CreateResponse(HttpStatusCode.OK); }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var longPollingTransport = new LongPollingTransport(httpClient); try { await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); var completedTask = await Task.WhenAny(completionTcs.Task, longPollingTransport.Running).OrTimeout(); Assert.Equal(completionTcs.Task, completedTask); } finally { await longPollingTransport.StopAsync(); } } } [Fact] public async Task SendsDeleteRequestWhenTransportCompleted() { var handler = TestHttpMessageHandler.CreateDefault(); using (var httpClient = new HttpClient(handler)) { var longPollingTransport = new LongPollingTransport(httpClient); await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary); await longPollingTransport.StopAsync(); var deleteRequest = handler.ReceivedRequests.SingleOrDefault(r => r.Method == HttpMethod.Delete); Assert.NotNull(deleteRequest); Assert.Equal(TestUri, deleteRequest.RequestUri); } } } }