Fix flaky SSE test (#1279)

This commit is contained in:
BrennanConroy 2018-02-15 09:56:02 -08:00 committed by GitHub
parent 78612fc4b0
commit 6fcf554c23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 51 deletions

View File

@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Threading;
namespace System.IO.Pipelines
{
internal class StreamPipeConnection : IDuplexPipe
@ -21,7 +23,7 @@ namespace System.IO.Pipelines
Output.Complete();
}
public static PipeReader CreateReader(PipeOptions options, Stream stream)
public static PipeReader CreateReader(PipeOptions options, Stream stream, CancellationToken cancellationToken = default)
{
if (!stream.CanRead)
{
@ -29,7 +31,7 @@ namespace System.IO.Pipelines
}
var pipe = new Pipe(options);
var ignore = stream.CopyToEndAsync(pipe.Writer);
var ignore = stream.CopyToEndAsync(pipe.Writer, cancellationToken);
return pipe.Reader;
}

View File

@ -82,60 +82,65 @@ namespace Microsoft.AspNetCore.Sockets.Client
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
using (var stream = await response.Content.ReadAsStreamAsync())
var stream = await response.Content.ReadAsStreamAsync();
var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
{
var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
while (true)
{
while (true)
var result = await pipelineReader.ReadAsync();
var input = result.Buffer;
if (result.IsCancelled || (input.IsEmpty && result.IsCompleted))
{
var result = await pipelineReader.ReadAsync();
var input = result.Buffer;
if (result.IsCancelled || (input.IsEmpty && result.IsCompleted))
{
_logger.EventStreamEnded();
break;
}
_logger.EventStreamEnded();
break;
}
var consumed = input.Start;
var examined = input.End;
var consumed = input.Start;
var examined = input.End;
try
{
var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer);
try
{
var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer);
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
await _application.Output.WriteAsync(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
if (result.IsCompleted)
{
throw new FormatException("Incomplete message.");
}
break;
}
}
finally
switch (parseResult)
{
pipelineReader.AdvanceTo(consumed, examined);
case ServerSentEventsMessageParser.ParseResult.Completed:
await _application.Output.WriteAsync(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
if (result.IsCompleted)
{
throw new FormatException("Incomplete message.");
}
break;
}
}
finally
{
pipelineReader.AdvanceTo(consumed, examined);
}
}
catch (OperationCanceledException)
}
catch (OperationCanceledException)
{
_logger.ReceiveCanceled();
}
finally
{
readCancellationRegistration.Dispose();
_transportCts.Cancel();
try
{
_logger.ReceiveCanceled();
}
finally
{
readCancellationRegistration.Dispose();
_transportCts.Cancel();
_logger.ReceiveStopped();
stream.Dispose();
}
// workaround issue with a null-ref in 2.0
catch { }
_logger.ReceiveStopped();
}
}

View File

@ -65,10 +65,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact(Skip = "Flaky tests keep failing")]
[Fact]
public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
{
var eventStreamCts = new CancellationTokenSource();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -77,13 +76,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, token) =>
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!eventStreamCts.IsCancellationRequested)
while (!token.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
await stream.WriteAsync(buffer, 0, buffer.Length, token).OrTimeout();
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
@ -107,7 +106,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
transportActiveTask = sseTransport.Running;
Assert.False(transportActiveTask.IsCompleted);
var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout();
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
Assert.StartsWith("3:abc", Encoding.ASCII.GetString(message));
}
finally
{
@ -115,7 +114,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
await transportActiveTask.OrTimeout();
eventStreamCts.Cancel();
}
}