Revert "Fix flaky SSE test (#1279)"

This reverts commit 6fcf554c23.
This commit is contained in:
BrennanConroy 2018-02-16 19:01:54 -08:00
parent a513386f68
commit 3acd29ec6f
3 changed files with 59 additions and 64 deletions

View File

@ -1,8 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Threading;
namespace System.IO.Pipelines
{
internal class StreamPipeConnection : IDuplexPipe
@ -23,7 +21,7 @@ namespace System.IO.Pipelines
Output.Complete();
}
public static PipeReader CreateReader(PipeOptions options, Stream stream, CancellationToken cancellationToken = default)
public static PipeReader CreateReader(PipeOptions options, Stream stream)
{
if (!stream.CanRead)
{
@ -31,7 +29,7 @@ namespace System.IO.Pipelines
}
var pipe = new Pipe(options);
var ignore = stream.CopyToEndAsync(pipe.Writer, cancellationToken);
var ignore = stream.CopyToEndAsync(pipe.Writer);
return pipe.Reader;
}

View File

@ -82,65 +82,60 @@ namespace Microsoft.AspNetCore.Sockets.Client
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
var stream = await response.Content.ReadAsStreamAsync();
var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream, cancellationToken);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
using (var stream = await response.Content.ReadAsStreamAsync())
{
while (true)
{
var result = await pipelineReader.ReadAsync();
var input = result.Buffer;
if (result.IsCancelled || (input.IsEmpty && result.IsCompleted))
{
_logger.EventStreamEnded();
break;
}
var consumed = input.Start;
var examined = input.End;
try
{
var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer);
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
await _application.Output.WriteAsync(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
if (result.IsCompleted)
{
throw new FormatException("Incomplete message.");
}
break;
}
}
finally
{
pipelineReader.AdvanceTo(consumed, examined);
}
}
}
catch (OperationCanceledException)
{
_logger.ReceiveCanceled();
}
finally
{
readCancellationRegistration.Dispose();
_transportCts.Cancel();
var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
{
stream.Dispose();
while (true)
{
var result = await pipelineReader.ReadAsync();
var input = result.Buffer;
if (result.IsCancelled || (input.IsEmpty && result.IsCompleted))
{
_logger.EventStreamEnded();
break;
}
var consumed = input.Start;
var examined = input.End;
try
{
var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer);
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
await _application.Output.WriteAsync(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
if (result.IsCompleted)
{
throw new FormatException("Incomplete message.");
}
break;
}
}
finally
{
pipelineReader.AdvanceTo(consumed, examined);
}
}
}
catch (OperationCanceledException)
{
_logger.ReceiveCanceled();
}
finally
{
readCancellationRegistration.Dispose();
_transportCts.Cancel();
_logger.ReceiveStopped();
}
// workaround issue with a null-ref in 2.0
catch { }
_logger.ReceiveStopped();
}
}

View File

@ -65,9 +65,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
[Fact(Skip = "Flaky tests keep failing")]
public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
{
var eventStreamCts = new CancellationTokenSource();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -76,13 +77,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, token) =>
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!token.IsCancellationRequested)
while (!eventStreamCts.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length, token).OrTimeout();
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
@ -106,7 +107,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
transportActiveTask = sseTransport.Running;
Assert.False(transportActiveTask.IsCompleted);
var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout();
Assert.StartsWith("3:abc", Encoding.ASCII.GetString(message));
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
}
finally
{
@ -114,6 +115,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
await transportActiveTask.OrTimeout();
eventStreamCts.Cancel();
}
}