From eaa03679deba1b042dc03a829cf65934e2d87ca7 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Thu, 17 May 2018 18:31:56 +1200 Subject: [PATCH] Fix flakey tests from SSE receiving a 404 response on stop (#2282) --- .../Internal/SendUtils.cs | 4 +- .../Internal/ServerSentEventsTransport.cs | 12 +- .../ServerSentEventsTransportTests.cs | 106 ++++++++++++++++-- .../ServerFixture.cs | 2 +- .../VerifiableServerLoggedTest.cs | 25 ++--- 5 files changed, 118 insertions(+), 31 deletions(-) diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/SendUtils.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/SendUtils.cs index d3cff78214..2af7ab5ee0 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/SendUtils.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/SendUtils.cs @@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal { internal static class SendUtils { - public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient, ILogger logger) + public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient, ILogger logger, CancellationToken cancellationToken = default) { Log.SendStarted(logger); @@ -49,7 +49,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal // rather than buffer the entire response. This gives a small perf boost. // Note that it is important to dispose of the response when doing this to // avoid leaving the connection open. - using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead)) + using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken)) { response.EnsureSuccessStatusCode(); } diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs index b92931a4d0..b0f3477a2b 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/Internal/ServerSentEventsTransport.cs @@ -81,14 +81,20 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal _transport = pair.Transport; _application = pair.Application; - Running = ProcessAsync(url, response); + // Cancellation token will be triggered when the pipe is stopped on the client. + // This is to avoid the client throwing from a 404 response caused by the + // server stopping the connection while the send message request is in progress. + var inputCts = new CancellationTokenSource(); + _application.Input.OnWriterCompleted((exception, state) => ((CancellationTokenSource)state).Cancel(), inputCts); + + Running = ProcessAsync(url, response, inputCts.Token); } - private async Task ProcessAsync(Uri url, HttpResponseMessage response) + private async Task ProcessAsync(Uri url, HttpResponseMessage response, CancellationToken inputCancellationToken) { // Start sending and polling (ask for binary if the server supports it) var receiving = ProcessEventStream(_application, response, _transportCts.Token); - var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, inputCancellationToken); // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs index 1e572a944e..cc4ddc0b73 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs @@ -13,14 +13,21 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections.Client; using Microsoft.AspNetCore.Http.Connections.Client.Internal; +using Microsoft.AspNetCore.SignalR.Tests; +using Microsoft.Extensions.Logging.Testing; using Moq; using Moq.Protected; using Xunit; +using Xunit.Abstractions; namespace Microsoft.AspNetCore.SignalR.Client.Tests { - public class ServerSentEventsTransportTests + public class ServerSentEventsTransportTests : VerifiableLoggedTest { + public ServerSentEventsTransportTests(ITestOutputHelper output) : base(output) + { + } + [Fact] public async Task CanStartStopSSETransport() { @@ -42,14 +49,15 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests .Setup(s => s.CopyToAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns(copyToAsyncTcs.Task); mockStream.Setup(s => s.CanRead).Returns(true); - return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) }; + return new HttpResponseMessage {Content = new StreamContent(mockStream.Object)}; }); try { using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); @@ -89,12 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests return Task.FromResult(new HttpResponseMessage { Content = new StreamContent(mockStream.Object) }); }); - Task transportActiveTask; - using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); + Task transportActiveTask; try { await sseTransport.StartAsync( @@ -138,8 +146,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); @@ -155,6 +164,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests [Fact] public async Task SSETransportStopsWithErrorIfSendingMessageFails() { + bool ExpectedErrors(WriteContext writeContext) + { + return writeContext.LoggerName == typeof(ServerSentEventsTransport).FullName && + writeContext.EventId.Name == "ErrorSending"; + } + var eventStreamTcs = new TaskCompletionSource(); var copyToAsyncTcs = new TaskCompletionSource(); @@ -183,8 +198,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory, expectedErrorsFilter: ExpectedErrors)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); @@ -226,8 +242,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); @@ -252,8 +269,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); @@ -265,6 +283,68 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } } + [Fact] + public async Task SSETransportCancelsSendOnStop() + { + var eventStreamTcs = new TaskCompletionSource(); + var copyToAsyncTcs = new TaskCompletionSource(); + var sendSyncPoint = new SyncPoint(); + + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + + if (request.Headers.Accept?.Contains(new MediaTypeWithQualityHeaderValue("text/event-stream")) == true) + { + // Receive loop started - allow stopping the transport + eventStreamTcs.SetResult(null); + + // returns unfinished task to block pipelines + var mockStream = new Mock(); + mockStream + .Setup(s => s.CopyToAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (stream, bufferSize, t) => + { + await copyToAsyncTcs.Task; + + throw new TaskCanceledException(); + }); + mockStream.Setup(s => s.CanRead).Returns(true); + return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) }; + } + + // Throw TaskCanceledException from SSE send's SendAsync on stop + cancellationToken.Register(s => ((SyncPoint)s).Continue(), sendSyncPoint); + await sendSyncPoint.WaitToContinue(); + throw new TaskCanceledException(); + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) + { + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); + + await sseTransport.StartAsync( + new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout(); + await eventStreamTcs.Task; + + await sseTransport.Output.WriteAsync(new byte[] { 0x42 }); + + // For for send request to be in progress + await sendSyncPoint.WaitForSyncPoint(); + + var stopTask = sseTransport.StopAsync(); + + copyToAsyncTcs.SetResult(null); + sendSyncPoint.Continue(); + + await stopTask; + } + } + [Fact] public async Task SSETransportDoesNotSupportBinary() { @@ -278,8 +358,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory)) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); var ex = await Assert.ThrowsAsync(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary).OrTimeout()); Assert.Equal($"The 'Binary' transfer format is not supported by this transport.{Environment.NewLine}Parameter name: transferFormat", ex.Message); @@ -302,8 +383,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests }); using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog(out var loggerFactory, $"{nameof(SSETransportThrowsForInvalidTransferFormat)}_{transferFormat}")) { - var sseTransport = new ServerSentEventsTransport(httpClient); + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory); var exception = await Assert.ThrowsAsync(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat)); diff --git a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs index c4338ea723..9460eef0dd 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs @@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests { var testLog = AssemblyTestLog.ForAssembly(typeof(TStartup).Assembly); _logToken = testLog.StartTestLog(null, $"{nameof(ServerFixture)}_{typeof(TStartup).Name}", - out _loggerFactory, "ServerFixture"); + out _loggerFactory, nameof(ServerFixture)); } else { diff --git a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/VerifiableServerLoggedTest.cs b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/VerifiableServerLoggedTest.cs index c31cd0650f..9d5f883470 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/VerifiableServerLoggedTest.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/VerifiableServerLoggedTest.cs @@ -5,13 +5,14 @@ using System; using System.Net.Http; using System.Net.WebSockets; using System.Runtime.CompilerServices; +using System.Threading; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Testing; using Xunit.Abstractions; namespace Microsoft.AspNetCore.SignalR.Tests { - public class VerifiableServerLoggedTest : VerifiableLoggedTest + public class VerifiableServerLoggedTest : VerifiableLoggedTest, IDisposable { private readonly Func _globalExpectedErrorsFilter; @@ -21,18 +22,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests { ServerFixture = serverFixture; - _globalExpectedErrorsFilter = (writeContext) => - { - // Suppress https://github.com/aspnet/SignalR/issues/2034 - if (writeContext.LoggerName == "Microsoft.AspNetCore.Http.Connections.Client.Internal.ServerSentEventsTransport" && - writeContext.Message.StartsWith("Error while sending to") && - writeContext.Exception is HttpRequestException) - { - return true; - } - - return false; - }; + // Suppress errors globally here + _globalExpectedErrorsFilter = (writeContext) => false; } private Func ResolveExpectedErrorsFilter(Func expectedErrorsFilter) @@ -64,5 +55,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests var disposable = base.StartVerifiableLog(out loggerFactory, testName, ResolveExpectedErrorsFilter(expectedErrorsFilter)); return new ServerLogScope(ServerFixture, loggerFactory, disposable); } + + public void Dispose() + { + // Unit tests in a fixture reuse the server. + // A small delay prevents server logging from a previous tests from showing up in the next test's logs + // by giving the server time to finish any in-progress request logic. + Thread.Sleep(TimeSpan.FromMilliseconds(100)); + } } } \ No newline at end of file