diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs index b600a85a4a..71e6febc31 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Threading; + namespace System.IO.Pipelines { internal class StreamPipeConnection : IDuplexPipe @@ -21,7 +23,7 @@ namespace System.IO.Pipelines Output.Complete(); } - public static PipeReader CreateReader(PipeOptions options, Stream stream) + public static PipeReader CreateReader(PipeOptions options, Stream stream, CancellationToken cancellationToken = default) { if (!stream.CanRead) { @@ -29,7 +31,7 @@ namespace System.IO.Pipelines } var pipe = new Pipe(options); - var ignore = stream.CopyToEndAsync(pipe.Writer); + var ignore = stream.CopyToEndAsync(pipe.Writer, cancellationToken); return pipe.Reader; } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs index f0b6c0be7c..d42830466e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs @@ -82,60 +82,65 @@ namespace Microsoft.AspNetCore.Sockets.Client request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); - using (var stream = await response.Content.ReadAsStreamAsync()) + var stream = await response.Content.ReadAsStreamAsync(); + var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream); + + var readCancellationRegistration = cancellationToken.Register( + reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader); + try { - var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream); - var readCancellationRegistration = cancellationToken.Register( - reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader); - try + while (true) { - while (true) + var result = await pipelineReader.ReadAsync(); + var input = result.Buffer; + if (result.IsCancelled || (input.IsEmpty && result.IsCompleted)) { - var result = await pipelineReader.ReadAsync(); - var input = result.Buffer; - if (result.IsCancelled || (input.IsEmpty && result.IsCompleted)) - { - _logger.EventStreamEnded(); - break; - } + _logger.EventStreamEnded(); + break; + } - var consumed = input.Start; - var examined = input.End; + var consumed = input.Start; + var examined = input.End; - try - { - var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer); + try + { + var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer); - switch (parseResult) - { - case ServerSentEventsMessageParser.ParseResult.Completed: - await _application.Output.WriteAsync(buffer); - _parser.Reset(); - break; - case ServerSentEventsMessageParser.ParseResult.Incomplete: - if (result.IsCompleted) - { - throw new FormatException("Incomplete message."); - } - break; - } - } - finally + switch (parseResult) { - pipelineReader.AdvanceTo(consumed, examined); + case ServerSentEventsMessageParser.ParseResult.Completed: + await _application.Output.WriteAsync(buffer); + _parser.Reset(); + break; + case ServerSentEventsMessageParser.ParseResult.Incomplete: + if (result.IsCompleted) + { + throw new FormatException("Incomplete message."); + } + break; } } + finally + { + pipelineReader.AdvanceTo(consumed, examined); + } } - catch (OperationCanceledException) + } + catch (OperationCanceledException) + { + _logger.ReceiveCanceled(); + } + finally + { + readCancellationRegistration.Dispose(); + _transportCts.Cancel(); + try { - _logger.ReceiveCanceled(); - } - finally - { - readCancellationRegistration.Dispose(); - _transportCts.Cancel(); - _logger.ReceiveStopped(); + stream.Dispose(); } + // workaround issue with a null-ref in 2.0 + catch { } + _logger.ReceiveStopped(); } } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs index 22a89d5e3e..63390d8b29 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs @@ -65,10 +65,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } } - [Fact(Skip = "Flaky tests keep failing")] + [Fact] public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped() { - var eventStreamCts = new CancellationTokenSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) @@ -77,13 +76,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var mockStream = new Mock(); mockStream .Setup(s => s.CopyToAsync(It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(async (stream, bufferSize, t) => + .Returns(async (stream, bufferSize, token) => { await Task.Yield(); var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n"); - while (!eventStreamCts.IsCancellationRequested) + while (!token.IsCancellationRequested) { - await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout(); + await stream.WriteAsync(buffer, 0, buffer.Length, token).OrTimeout(); } }); mockStream.Setup(s => s.CanRead).Returns(true); @@ -107,7 +106,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests transportActiveTask = sseTransport.Running; Assert.False(transportActiveTask.IsCompleted); var message = await pair.Transport.Input.ReadSingleAsync().OrTimeout(); - Assert.Equal("3:abc", Encoding.ASCII.GetString(message)); + Assert.StartsWith("3:abc", Encoding.ASCII.GetString(message)); } finally { @@ -115,7 +114,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } await transportActiveTask.OrTimeout(); - eventStreamCts.Cancel(); } }