Merge pull request #1783 from aspnet/release/2.1

Release/2.1
This commit is contained in:
David Fowler 2018-03-30 02:33:28 -07:00 committed by GitHub
commit c29c1b3ec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 58 deletions

View File

@ -190,6 +190,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Http
{
Log.SkippingDispose(_logger);
}
_httpClient?.Dispose();
}
finally
{

View File

@ -21,8 +21,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
private readonly HttpOptions _httpOptions;
private readonly ILogger _logger;
private IDuplexPipe _application;
private Task _sender;
private Task _poller;
// Volatile so that the poll loop sees the updated value set from a different thread
private volatile Exception _error;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
@ -52,40 +52,54 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
Log.StartTransport(_logger, transferFormat);
// Start sending and polling (ask for binary if the server supports it)
_poller = Poll(url, _transportCts.Token);
_sender = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _transportCts, _logger);
Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
{
Log.TransportStopped(_logger, t.Exception?.InnerException);
_application.Output.Complete(t.Exception?.InnerException);
_application.Input.Complete();
return t;
}).Unwrap();
Running = ProcessAsync(url);
return Task.CompletedTask;
}
private async Task ProcessAsync(Uri url)
{
// Start sending and polling (ask for binary if the server supports it)
var receiving = Poll(url, _transportCts.Token);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _logger);
// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending);
if (trigger == receiving)
{
// We're waiting for the application to finish and there are 2 things it could be doing
// 1. Waiting for application data
// 2. Waiting for an outgoing send (this should be instantaneous)
// Cancel the application so that ReadAsync yields
_application.Input.CancelPendingRead();
}
else
{
// Set the sending error so we communicate that to the application
_error = sending.IsFaulted ? sending.Exception.InnerException : null;
_transportCts.Cancel();
// Cancel any pending flush so that we can quit
_application.Output.CancelPendingFlush();
}
}
public async Task StopAsync()
{
Log.TransportStopping(_logger);
_transportCts.Cancel();
_application.Input.CancelPendingRead();
try
{
await Running;
}
catch
{
// exceptions have been handled in the Running task continuation by closing the channel with the exception
}
await Running;
}
private async Task Poll(Uri pollUrl, CancellationToken cancellationToken)
{
Log.StartReceive(_logger);
try
{
while (!cancellationToken.IsCancellationRequested)
@ -124,6 +138,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
var stream = new PipeWriterStream(_application.Output);
await response.Content.CopyToAsync(stream);
var flushResult = await _application.Output.FlushAsync();
// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
}
}
@ -135,12 +157,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
catch (Exception ex)
{
Log.ErrorPolling(_logger, pollUrl, ex);
throw;
_error = ex;
}
finally
{
// Make sure the send loop is terminated
_transportCts.Cancel();
_application.Output.Complete(_error);
Log.ReceiveStopped(_logger);
}
}

View File

@ -20,6 +20,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
private readonly HttpClient _httpClient;
private readonly HttpOptions _httpOptions;
private readonly ILogger _logger;
// Volatile so that the SSE loop sees the updated value set from a different thread
private volatile Exception _error;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser();
@ -55,21 +58,42 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
Log.StartTransport(_logger, transferFormat);
var startTcs = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
var sendTask = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _transportCts, _logger);
var receiveTask = OpenConnection(_application, url, startTcs, _transportCts.Token);
Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t =>
{
Log.TransportStopped(_logger, t.Exception?.InnerException);
_application.Output.Complete(t.Exception?.InnerException);
_application.Input.Complete();
return t;
}).Unwrap();
Running = ProcessAsync(url, startTcs);
return startTcs.Task;
}
private async Task ProcessAsync(Uri url, TaskCompletionSource<object> startTcs)
{
// Start sending and polling (ask for binary if the server supports it)
var receiving = OpenConnection(_application, url, startTcs, _transportCts.Token);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _logger);
// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending);
if (trigger == receiving)
{
// We're waiting for the application to finish and there are 2 things it could be doing
// 1. Waiting for application data
// 2. Waiting for an outgoing send (this should be instantaneous)
// Cancel the application so that ReadAsync yields
_application.Input.CancelPendingRead();
}
else
{
// Set the sending error so we communicate that to the application
_error = sending.IsFaulted ? sending.Exception.InnerException : null;
_transportCts.Cancel();
// Cancel any pending flush so that we can quit
_application.Output.CancelPendingFlush();
}
}
private async Task OpenConnection(IDuplexPipe application, Uri url, TaskCompletionSource<object> startTcs, CancellationToken cancellationToken)
{
Log.StartReceive(_logger);
@ -78,7 +102,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
SendUtils.PrepareHttpRequest(request, _httpOptions);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
HttpResponseMessage response;
HttpResponseMessage response = null;
try
{
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
@ -87,11 +112,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
}
catch (Exception ex)
{
response?.Dispose();
Log.TransportStopping(_logger);
startTcs.TrySetException(ex);
return;
}
using (response)
using (var stream = await response.Content.ReadAsStreamAsync())
{
var pipeOptions = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0);
@ -116,12 +143,15 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
{
Log.ParsingSSE(_logger, input.Length);
var parseResult = _parser.ParseMessage(input, out consumed, out examined, out var buffer);
FlushResult flushResult = default;
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
Log.MessageToApp(_logger, buffer.Length);
await _application.Output.WriteAsync(buffer);
flushResult = await _application.Output.WriteAsync(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
@ -131,6 +161,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
}
break;
}
// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
finally
{
@ -142,10 +179,16 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
{
Log.ReceiveCanceled(_logger);
}
catch (Exception ex)
{
_error = ex;
}
finally
{
_application.Output.Complete(_error);
readCancellationRegistration.Dispose();
_transportCts.Cancel();
Log.ReceiveStopped(_logger);
}
}
@ -154,16 +197,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
public async Task StopAsync()
{
Log.TransportStopping(_logger);
_transportCts.Cancel();
try
{
await Running;
}
catch
{
// exceptions have been handled in the Running task continuation by closing the channel with the exception
}
_application.Input.CancelPendingRead();
await Running;
}
}
}

View File

@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
internal static class SendUtils
{
public static async Task SendMessages(Uri sendUrl, IDuplexPipe application, HttpClient httpClient,
HttpOptions httpOptions, CancellationTokenSource transportCts, ILogger logger)
HttpOptions httpOptions, ILogger logger)
{
Log.SendStarted(logger);
@ -25,14 +25,17 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
while (true)
{
var result = await application.Input.ReadAsync(transportCts.Token);
var result = await application.Input.ReadAsync();
var buffer = result.Buffer;
try
{
// Grab as many messages as we can from the pipe
if (result.IsCanceled)
{
Log.SendCanceled(logger);
break;
}
transportCts.Token.ThrowIfCancellationRequested();
if (!buffer.IsEmpty)
{
Log.SendingMessages(logger, buffer.Length, sendUrl);
@ -45,8 +48,14 @@ namespace Microsoft.AspNetCore.Sockets.Client
request.Content = new ReadOnlySequenceContent(buffer);
var response = await httpClient.SendAsync(request, transportCts.Token);
response.EnsureSuccessStatusCode();
// ResponseHeadersRead instructs SendAsync to return once headers are read
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();
}
Log.SentSuccessfully(logger);
}
@ -76,8 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
finally
{
// Make sure the poll loop is terminated
transportCts.Cancel();
application.Input.Complete();
}
Log.SendStopped(logger);

View File

@ -210,7 +210,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await pair.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
await longPollingTransport.Running.OrTimeout();
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await pair.Transport.Input.ReadAllAsync().OrTimeout());
Assert.Contains(" 500 ", exception.Message);

View File

@ -154,7 +154,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), pair.Application, TransferFormat.Text, connection: Mock.Of<IConnection>()).OrTimeout();
var exception = await Assert.ThrowsAsync<FormatException>(() => sseTransport.Running.OrTimeout());
var exception = await Assert.ThrowsAsync<FormatException>(() => pair.Transport.Input.ReadAllAsync());
await sseTransport.Running.OrTimeout();
Assert.Equal("Incomplete message.", exception.Message);
}
}
@ -204,7 +207,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var exception = await Assert.ThrowsAsync<HttpRequestException>(() => pair.Transport.Input.ReadAllAsync().OrTimeout());
Assert.Contains("500", exception.Message);
Assert.Same(exception, await Assert.ThrowsAsync<HttpRequestException>(() => sseTransport.Running.OrTimeout()));
// Errors are only communicated through the pipe
await sseTransport.Running.OrTimeout();
}
}