Fix flakey tests from SSE receiving a 404 response on stop (#2282)

This commit is contained in:
James Newton-King 2018-05-17 18:31:56 +12:00 committed by GitHub
parent cd8f238f83
commit eaa03679de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 31 deletions

View File

@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
internal static class SendUtils
{
public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient, ILogger logger)
public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient, ILogger logger, CancellationToken cancellationToken = default)
{
Log.SendStarted(logger);
@ -49,7 +49,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken))
{
response.EnsureSuccessStatusCode();
}

View File

@ -81,14 +81,20 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_transport = pair.Transport;
_application = pair.Application;
Running = ProcessAsync(url, response);
// Cancellation token will be triggered when the pipe is stopped on the client.
// This is to avoid the client throwing from a 404 response caused by the
// server stopping the connection while the send message request is in progress.
var inputCts = new CancellationTokenSource();
_application.Input.OnWriterCompleted((exception, state) => ((CancellationTokenSource)state).Cancel(), inputCts);
Running = ProcessAsync(url, response, inputCts.Token);
}
private async Task ProcessAsync(Uri url, HttpResponseMessage response)
private async Task ProcessAsync(Uri url, HttpResponseMessage response, CancellationToken inputCancellationToken)
{
// Start sending and polling (ask for binary if the server supports it)
var receiving = ProcessEventStream(_application, response, _transportCts.Token);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, inputCancellationToken);
// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending);

View File

@ -13,14 +13,21 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging.Testing;
using Moq;
using Moq.Protected;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public class ServerSentEventsTransportTests
public class ServerSentEventsTransportTests : VerifiableLoggedTest
{
public ServerSentEventsTransportTests(ITestOutputHelper output) : base(output)
{
}
[Fact]
public async Task CanStartStopSSETransport()
{
@ -42,14 +49,15 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
return new HttpResponseMessage {Content = new StreamContent(mockStream.Object)};
});
try
{
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
@ -89,12 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
return Task.FromResult(new HttpResponseMessage { Content = new StreamContent(mockStream.Object) });
});
Task transportActiveTask;
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
Task transportActiveTask;
try
{
await sseTransport.StartAsync(
@ -138,8 +146,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
@ -155,6 +164,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
public async Task SSETransportStopsWithErrorIfSendingMessageFails()
{
bool ExpectedErrors(WriteContext writeContext)
{
return writeContext.LoggerName == typeof(ServerSentEventsTransport).FullName &&
writeContext.EventId.Name == "ErrorSending";
}
var eventStreamTcs = new TaskCompletionSource<object>();
var copyToAsyncTcs = new TaskCompletionSource<int>();
@ -183,8 +198,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory, expectedErrorsFilter: ExpectedErrors))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
@ -226,8 +242,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
@ -252,8 +269,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
@ -265,6 +283,68 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task SSETransportCancelsSendOnStop()
{
var eventStreamTcs = new TaskCompletionSource<object>();
var copyToAsyncTcs = new TaskCompletionSource<object>();
var sendSyncPoint = new SyncPoint();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (request.Headers.Accept?.Contains(new MediaTypeWithQualityHeaderValue("text/event-stream")) == true)
{
// Receive loop started - allow stopping the transport
eventStreamTcs.SetResult(null);
// returns unfinished task to block pipelines
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
{
await copyToAsyncTcs.Task;
throw new TaskCanceledException();
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
}
// Throw TaskCanceledException from SSE send's SendAsync on stop
cancellationToken.Register(s => ((SyncPoint)s).Continue(), sendSyncPoint);
await sendSyncPoint.WaitToContinue();
throw new TaskCanceledException();
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), TransferFormat.Text).OrTimeout();
await eventStreamTcs.Task;
await sseTransport.Output.WriteAsync(new byte[] { 0x42 });
// For for send request to be in progress
await sendSyncPoint.WaitForSyncPoint();
var stopTask = sseTransport.StopAsync();
copyToAsyncTcs.SetResult(null);
sendSyncPoint.Continue();
await stopTask;
}
}
[Fact]
public async Task SSETransportDoesNotSupportBinary()
{
@ -278,8 +358,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
var ex = await Assert.ThrowsAsync<ArgumentException>(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary).OrTimeout());
Assert.Equal($"The 'Binary' transfer format is not supported by this transport.{Environment.NewLine}Parameter name: transferFormat", ex.Message);
@ -302,8 +383,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (StartVerifiableLog(out var loggerFactory, $"{nameof(SSETransportThrowsForInvalidTransferFormat)}_{transferFormat}"))
{
var sseTransport = new ServerSentEventsTransport(httpClient);
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory);
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
sseTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat));

View File

@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
var testLog = AssemblyTestLog.ForAssembly(typeof(TStartup).Assembly);
_logToken = testLog.StartTestLog(null, $"{nameof(ServerFixture<TStartup>)}_{typeof(TStartup).Name}",
out _loggerFactory, "ServerFixture");
out _loggerFactory, nameof(ServerFixture));
}
else
{

View File

@ -5,13 +5,14 @@ using System;
using System.Net.Http;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Tests
{
public class VerifiableServerLoggedTest : VerifiableLoggedTest
public class VerifiableServerLoggedTest : VerifiableLoggedTest, IDisposable
{
private readonly Func<WriteContext, bool> _globalExpectedErrorsFilter;
@ -21,18 +22,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
ServerFixture = serverFixture;
_globalExpectedErrorsFilter = (writeContext) =>
{
// Suppress https://github.com/aspnet/SignalR/issues/2034
if (writeContext.LoggerName == "Microsoft.AspNetCore.Http.Connections.Client.Internal.ServerSentEventsTransport" &&
writeContext.Message.StartsWith("Error while sending to") &&
writeContext.Exception is HttpRequestException)
{
return true;
}
return false;
};
// Suppress errors globally here
_globalExpectedErrorsFilter = (writeContext) => false;
}
private Func<WriteContext, bool> ResolveExpectedErrorsFilter(Func<WriteContext, bool> expectedErrorsFilter)
@ -64,5 +55,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var disposable = base.StartVerifiableLog(out loggerFactory, testName, ResolveExpectedErrorsFilter(expectedErrorsFilter));
return new ServerLogScope(ServerFixture, loggerFactory, disposable);
}
public void Dispose()
{
// Unit tests in a fixture reuse the server.
// A small delay prevents server logging from a previous tests from showing up in the next test's logs
// by giving the server time to finish any in-progress request logic.
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
}
}