diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs index ecae0d7d3c..80d6bbecf3 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs @@ -190,6 +190,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Http { Log.SkippingDispose(_logger); } + + _httpClient?.Dispose(); } finally { diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/LongPollingTransport.cs index 72b5bebf04..a83497dbe3 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/LongPollingTransport.cs @@ -21,8 +21,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal private readonly HttpOptions _httpOptions; private readonly ILogger _logger; private IDuplexPipe _application; - private Task _sender; - private Task _poller; + // Volatile so that the poll loop sees the updated value set from a different thread + private volatile Exception _error; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); @@ -52,40 +52,54 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal Log.StartTransport(_logger, transferFormat); - // Start sending and polling (ask for binary if the server supports it) - _poller = Poll(url, _transportCts.Token); - _sender = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _transportCts, _logger); - - Running = Task.WhenAll(_sender, _poller).ContinueWith(t => - { - Log.TransportStopped(_logger, t.Exception?.InnerException); - _application.Output.Complete(t.Exception?.InnerException); - _application.Input.Complete(); - return t; - }).Unwrap(); + Running = ProcessAsync(url); return Task.CompletedTask; } + private async Task ProcessAsync(Uri url) + { + // Start sending and polling (ask for binary if the server supports it) + var receiving = Poll(url, _transportCts.Token); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _logger); + + // Wait for send or receive to complete + var trigger = await Task.WhenAny(receiving, sending); + + if (trigger == receiving) + { + // We're waiting for the application to finish and there are 2 things it could be doing + // 1. Waiting for application data + // 2. Waiting for an outgoing send (this should be instantaneous) + + // Cancel the application so that ReadAsync yields + _application.Input.CancelPendingRead(); + } + else + { + // Set the sending error so we communicate that to the application + _error = sending.IsFaulted ? sending.Exception.InnerException : null; + + _transportCts.Cancel(); + + // Cancel any pending flush so that we can quit + _application.Output.CancelPendingFlush(); + } + } + public async Task StopAsync() { Log.TransportStopping(_logger); - _transportCts.Cancel(); + _application.Input.CancelPendingRead(); - try - { - await Running; - } - catch - { - // exceptions have been handled in the Running task continuation by closing the channel with the exception - } + await Running; } private async Task Poll(Uri pollUrl, CancellationToken cancellationToken) { Log.StartReceive(_logger); + try { while (!cancellationToken.IsCancellationRequested) @@ -124,6 +138,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal var stream = new PipeWriterStream(_application.Output); await response.Content.CopyToAsync(stream); + var flushResult = await _application.Output.FlushAsync(); + + // We canceled in the middle of applying back pressure + // or if the consumer is done + if (flushResult.IsCanceled || flushResult.IsCompleted) + { + break; + } } } } @@ -135,12 +157,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal catch (Exception ex) { Log.ErrorPolling(_logger, pollUrl, ex); - throw; + + _error = ex; } finally { - // Make sure the send loop is terminated - _transportCts.Cancel(); + _application.Output.Complete(_error); + Log.ReceiveStopped(_logger); } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/ServerSentEventsTransport.cs index 0146bdfa11..e0c4041a43 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/ServerSentEventsTransport.cs @@ -20,6 +20,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal private readonly HttpClient _httpClient; private readonly HttpOptions _httpOptions; private readonly ILogger _logger; + + // Volatile so that the SSE loop sees the updated value set from a different thread + private volatile Exception _error; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser(); @@ -55,21 +58,42 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal Log.StartTransport(_logger, transferFormat); var startTcs = new TaskCompletionSource(TaskContinuationOptions.RunContinuationsAsynchronously); - var sendTask = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _transportCts, _logger); - var receiveTask = OpenConnection(_application, url, startTcs, _transportCts.Token); - Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => - { - Log.TransportStopped(_logger, t.Exception?.InnerException); - _application.Output.Complete(t.Exception?.InnerException); - _application.Input.Complete(); - - return t; - }).Unwrap(); + Running = ProcessAsync(url, startTcs); return startTcs.Task; } + private async Task ProcessAsync(Uri url, TaskCompletionSource startTcs) + { + // Start sending and polling (ask for binary if the server supports it) + var receiving = OpenConnection(_application, url, startTcs, _transportCts.Token); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _logger); + + // Wait for send or receive to complete + var trigger = await Task.WhenAny(receiving, sending); + + if (trigger == receiving) + { + // We're waiting for the application to finish and there are 2 things it could be doing + // 1. Waiting for application data + // 2. Waiting for an outgoing send (this should be instantaneous) + + // Cancel the application so that ReadAsync yields + _application.Input.CancelPendingRead(); + } + else + { + // Set the sending error so we communicate that to the application + _error = sending.IsFaulted ? sending.Exception.InnerException : null; + + _transportCts.Cancel(); + + // Cancel any pending flush so that we can quit + _application.Output.CancelPendingFlush(); + } + } + private async Task OpenConnection(IDuplexPipe application, Uri url, TaskCompletionSource startTcs, CancellationToken cancellationToken) { Log.StartReceive(_logger); @@ -78,7 +102,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal SendUtils.PrepareHttpRequest(request, _httpOptions); request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); - HttpResponseMessage response; + HttpResponseMessage response = null; + try { response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); @@ -87,11 +112,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal } catch (Exception ex) { + response?.Dispose(); Log.TransportStopping(_logger); startTcs.TrySetException(ex); return; } + using (response) using (var stream = await response.Content.ReadAsStreamAsync()) { var pipeOptions = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0); @@ -116,12 +143,15 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal { Log.ParsingSSE(_logger, input.Length); var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer); + FlushResult flushResult = default; switch (parseResult) { case ServerSentEventsMessageParser.ParseResult.Completed: Log.MessageToApp(_logger, buffer.Length); - await _application.Output.WriteAsync(buffer); + + flushResult = await _application.Output.WriteAsync(buffer); + _parser.Reset(); break; case ServerSentEventsMessageParser.ParseResult.Incomplete: @@ -131,6 +161,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal } break; } + + // We canceled in the middle of applying back pressure + // or if the consumer is done + if (flushResult.IsCanceled || flushResult.IsCompleted) + { + break; + } } finally { @@ -142,10 +179,16 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal { Log.ReceiveCanceled(_logger); } + catch (Exception ex) + { + _error = ex; + } finally { + _application.Output.Complete(_error); + readCancellationRegistration.Dispose(); - _transportCts.Cancel(); + Log.ReceiveStopped(_logger); } } @@ -154,16 +197,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal public async Task StopAsync() { Log.TransportStopping(_logger); - _transportCts.Cancel(); - try - { - await Running; - } - catch - { - // exceptions have been handled in the Running task continuation by closing the channel with the exception - } + _application.Input.CancelPendingRead(); + + await Running; } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs index b8b8e31060..f6e3f24f35 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs @@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.Sockets.Client internal static class SendUtils { public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient, - HttpOptions httpOptions, CancellationTokenSource transportCts, ILogger logger) + HttpOptions httpOptions, ILogger logger) { Log.SendStarted(logger); @@ -25,14 +25,17 @@ namespace Microsoft.AspNetCore.Sockets.Client { while (true) { - var result = await application.Input.ReadAsync(transportCts.Token); + var result = await application.Input.ReadAsync(); var buffer = result.Buffer; try { - // Grab as many messages as we can from the pipe + if (result.IsCanceled) + { + Log.SendCanceled(logger); + break; + } - transportCts.Token.ThrowIfCancellationRequested(); if (!buffer.IsEmpty) { Log.SendingMessages(logger, buffer.Length, sendUrl); @@ -45,8 +48,14 @@ namespace Microsoft.AspNetCore.Sockets.Client request.Content = new ReadOnlySequenceContent(buffer); - var response = await httpClient.SendAsync(request, transportCts.Token); - response.EnsureSuccessStatusCode(); + // ResponseHeadersRead instructs SendAsync to return once headers are read + // rather than buffer the entire response. This gives a small perf boost. + // Note that it is important to dispose of the response when doing this to + // avoid leaving the connection open. + using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead)) + { + response.EnsureSuccessStatusCode(); + } Log.SentSuccessfully(logger); } @@ -76,8 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client } finally { - // Make sure the poll loop is terminated - transportCts.Cancel(); + application.Input.Complete(); } Log.SendStopped(logger); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs index 2c62cd6ba6..193f741125 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs @@ -210,7 +210,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World")); - await Assert.ThrowsAsync(async () => await longPollingTransport.Running.OrTimeout()); + await longPollingTransport.Running.OrTimeout(); var exception = await Assert.ThrowsAsync(async () => await pair.Transport.Input.ReadAllAsync().OrTimeout()); Assert.Contains(" 500 ", exception.Message); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs index 833693082f..2bc9df7dd9 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs @@ -154,7 +154,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests await sseTransport.StartAsync( new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of()).OrTimeout(); - var exception = await Assert.ThrowsAsync(() => sseTransport.Running.OrTimeout()); + var exception = await Assert.ThrowsAsync(() => pair.Transport.Input.ReadAllAsync()); + + await sseTransport.Running.OrTimeout(); + Assert.Equal("Incomplete message.", exception.Message); } } @@ -204,7 +207,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var exception = await Assert.ThrowsAsync(() => pair.Transport.Input.ReadAllAsync().OrTimeout()); Assert.Contains("500", exception.Message); - Assert.Same(exception, await Assert.ThrowsAsync(() => sseTransport.Running.OrTimeout())); + // Errors are only communicated through the pipe + await sseTransport.Running.OrTimeout(); } }