Add support for timing out poll requests (#538)

* Add support for timing out poll requests
- Default poll request is 110 seconds (like in previous versions of SignalR)
- Use 200 with a 0 content length for timeouts.
- Added support for not timing out while debugging
This commit is contained in:
David Fowler 2017-06-07 20:55:00 -10:00 committed by GitHub
parent d1df3671d8
commit 523517f60c
12 changed files with 196 additions and 45 deletions

View File

@ -173,8 +173,13 @@ export class LongPollingTransport implements ITransport {
if (pollXhr.status == 200) {
if (this.onDataReceived) {
try {
console.log(`(LongPolling transport) data received: ${pollXhr.response}`);
this.onDataReceived(pollXhr.response);
if (pollXhr.response) {
console.log(`(LongPolling transport) data received: ${pollXhr.response}`);
this.onDataReceived(pollXhr.response);
}
else {
console.log(`(LongPolling transport) timed out`);
}
} catch (error) {
if (this.onClosed) {
this.onClosed(error);
@ -210,7 +215,7 @@ export class LongPollingTransport implements ITransport {
this.pollXhr = pollXhr;
this.pollXhr.open("GET", url, true);
// TODO: consider making timeout configurable
this.pollXhr.timeout = 110000;
this.pollXhr.timeout = 120000;
this.pollXhr.send();
}

View File

@ -81,7 +81,7 @@ TBD: Keep Alive - Should it be done at this level?
Long Polling is a server-to-client half-transport, so it is always paired with HTTP Post. It requires a connection already be established using the `OPTIONS [endpoint-base]` request.
Long Polling requires that the client poll the server for new messages. Unlike traditional polling, if there is no data available, the server will simply wait for messages to be dispatched. At some point, the server, client or an upstream proxy will likely terminate the connection, at which point the client should immediately re-send the request. Long Polling is the only transport that allows a "reconnection" where a new request can be received while the server believes an existing request is in process. This can happen because of a time out. When this happens, the existing request is immediately terminated with status code `204 No Content`. Any messages which have already been written to the existing request will be flushed and considered sent.
Long Polling requires that the client poll the server for new messages. Unlike traditional polling, if there is no data available, the server will simply wait for messages to be dispatched. At some point, the server, client or an upstream proxy will likely terminate the connection, at which point the client should immediately re-send the request. Long Polling is the only transport that allows a "reconnection" where a new request can be received while the server believes an existing request is in process. This can happen because of a time out. When this happens, the existing request is immediately terminated with status code `204 No Content`. Any messages which have already been written to the existing request will be flushed and considered sent. In the case of a server side timeout with no data, a `200 OK` with a 0 `Content-Length` will be sent and the client should poll again for more data.
A Poll is established by sending an HTTP GET request to `[endpoint-base]` with the following query string parameters

View File

@ -4,10 +4,8 @@
using System;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -109,7 +107,6 @@ namespace Microsoft.AspNetCore.Sockets.Client
return;
}
}
}
}
}

View File

@ -139,24 +139,19 @@ namespace Microsoft.AspNetCore.Sockets
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
{
_logger.LogDebug("Connection {connectionId} is already active via {requestId}. Cancelling previous request.", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
var existing = connection.GetHttpContext();
_logger.LogDebug("Connection {connectionId} is already active via {requestId}. Cancelling previous request.", connection.ConnectionId, existing.TraceIdentifier);
using (connection.Cancellation)
{
// Cancel the previous request
connection.Cancellation.Cancel();
try
{
// Wait for the previous request to drain
await connection.TransportTask;
}
catch (OperationCanceledException)
{
// Should be a cancelled task
}
// Wait for the previous request to drain
await connection.TransportTask;
_logger.LogDebug("Previous poll cancelled for {connectionId} on {requestId}.", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
_logger.LogDebug("Previous poll cancelled for {connectionId} on {requestId}.", connection.ConnectionId, existing.TraceIdentifier);
}
}
@ -177,15 +172,23 @@ namespace Microsoft.AspNetCore.Sockets
_logger.LogDebug("Resuming existing connection: {connectionId} on {requestId}", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
}
var longPolling = new LongPollingTransport(connection.Application.Input, _loggerFactory);
// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
connection.Cancellation = new CancellationTokenSource();
// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(connection.Cancellation.Token, context.RequestAborted);
var timeoutSource = new CancellationTokenSource();
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(connection.Cancellation.Token, context.RequestAborted, timeoutSource.Token);
// Dispose these tokens when the request is over
context.Response.RegisterForDispose(timeoutSource);
context.Response.RegisterForDispose(tokenSource);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
// Start the transport
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
// Start the timeout after we return from creating the transport task
timeoutSource.CancelAfter(options.LongPolling.PollTimeout);
}
finally
{
@ -206,7 +209,7 @@ namespace Microsoft.AspNetCore.Sockets
// Wait for the transport to run
await connection.TransportTask;
// If the status code is a 204 it means we didn't write anything
// If the status code is a 204 it means the connection is done
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
{
// We should be able to safely dispose because there's no more data being written
@ -216,7 +219,7 @@ namespace Microsoft.AspNetCore.Sockets
pollAgain = false;
}
}
else if (resultTask.IsCanceled)
else if (context.Response.StatusCode == StatusCodes.Status204NoContent)
{
// Don't poll if the transport task was cancelled
pollAgain = false;
@ -340,7 +343,7 @@ namespace Microsoft.AspNetCore.Sockets
jsonWriter.WriteStartArray();
if ((options.Transports & TransportType.WebSockets) != 0)
{
jsonWriter.WriteValue(nameof(TransportType.WebSockets));
jsonWriter.WriteValue(nameof(TransportType.WebSockets));
}
if ((options.Transports & TransportType.ServerSentEvents) != 0)
{

View File

@ -13,5 +13,7 @@ namespace Microsoft.AspNetCore.Sockets
public TransportType Transports { get; set; } = TransportType.All;
public WebSocketOptions WebSockets { get; } = new WebSocketOptions();
public LongPollingOptions LongPolling { get; } = new LongPollingOptions();
}
}

View File

@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.AspNetCore.Sockets
{
public class LongPollingOptions
{
public TimeSpan PollTimeout { get; set; } = TimeSpan.FromSeconds(110);
}
}

View File

@ -7,7 +7,6 @@ using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Transports
@ -16,9 +15,11 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
private readonly ReadableChannel<byte[]> _application;
private readonly ILogger _logger;
private readonly CancellationToken _timeoutToken;
public LongPollingTransport(ReadableChannel<byte[]> application, ILoggerFactory loggerFactory)
public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, ILoggerFactory loggerFactory)
{
_timeoutToken = timeoutToken;
_application = application;
_logger = loggerFactory.CreateLogger<LongPollingTransport>();
}
@ -58,16 +59,32 @@ namespace Microsoft.AspNetCore.Sockets.Transports
}
catch (OperationCanceledException)
{
if (!context.RequestAborted.IsCancellationRequested)
// 3 cases:
// 1 - Request aborted, the client disconnected (no response)
// 2 - The poll timeout is hit (204)
// 3 - A new request comes in and cancels this request (205)
// Case 1
if (context.RequestAborted.IsCancellationRequested)
{
// Don't count this as cancellation, this is normal as the poll can end due to the browser closing.
// The background thread will eventually dispose this connection if it's inactive
_logger.LogDebug("Client disconnected from Long Polling endpoint.");
}
// Case 2
else if (_timeoutToken.IsCancellationRequested)
{
_logger.LogInformation("Poll request timed out. Sending 200 response.");
context.Response.ContentLength = 0;
context.Response.StatusCode = StatusCodes.Status200OK;
}
else
{
// Case 3
_logger.LogInformation("Terminating Long Polling connection by sending 204 response.");
context.Response.StatusCode = StatusCodes.Status204NoContent;
throw;
}
// Don't count this as cancellation, this is normal as the poll can end due to the browesr closing.
// The background thread will eventually dispose this connection if it's inactive
_logger.LogDebug("Client disconnected from Long Polling endpoint.");
}
catch (Exception ex)
{

View File

@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
@ -95,7 +96,7 @@ namespace Microsoft.AspNetCore.Sockets
try
{
if (_disposed)
if (_disposed || Debugger.IsAttached)
{
return;
}

View File

@ -2,7 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Tests.Common
@ -18,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Common
public static async Task OrTimeout(this Task task, TimeSpan timeout, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int? lineNumber = null)
{
var completed = await Task.WhenAny(task, Task.Delay(timeout));
var completed = await Task.WhenAny(task, Task.Delay(Debugger.IsAttached ? Timeout.InfiniteTimeSpan : timeout));
if (completed != task)
{
throw new TimeoutException(GetMessage(memberName, filePath, lineNumber));
@ -34,7 +36,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Common
public static async Task<T> OrTimeout<T>(this Task<T> task, TimeSpan timeout, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int? lineNumber = null)
{
var completed = await Task.WhenAny(task, Task.Delay(timeout));
var completed = await Task.WhenAny(task, Task.Delay(Debugger.IsAttached ? Timeout.InfiniteTimeSpan : timeout));
if (completed != task)
{
throw new TimeoutException(GetMessage(memberName, filePath, lineNumber));

View File

@ -92,6 +92,63 @@ namespace Microsoft.AspNetCore.Client.Tests
}
}
[Fact]
public async Task LongPollingTransportResponseWithNoContentDoesNotStopPoll()
{
int requests = 0;
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (requests == 0)
{
requests++;
return ResponseUtils.CreateResponse(HttpStatusCode.OK, "Hello");
}
else if (requests == 1)
{
requests++;
// Time out
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
}
else if (requests == 2)
{
requests++;
return ResponseUtils.CreateResponse(HttpStatusCode.OK, "World");
}
// Done
return ResponseUtils.CreateResponse(HttpStatusCode.NoContent);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
var transportToConnection = Channel.CreateUnbounded<byte[]>();
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var data = await transportToConnection.In.ReadAllAsync().OrTimeout();
await longPollingTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.Completion.IsCompleted);
Assert.Equal(2, data.Count);
Assert.Equal(Encoding.UTF8.GetBytes("Hello"), data[0]);
Assert.Equal(Encoding.UTF8.GetBytes("World"), data[1]);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
[Fact]
public async Task LongPollingTransportStopsWhenPollRequestFails()
{

View File

@ -26,10 +26,6 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public class HttpConnectionDispatcherTests
{
// Redefined from MessageFormatter because we want constants to go in the Attributes
private const string TextContentType = "application/vnd.microsoft.aspnetcore.endpoint-messages.v1+text";
private const string BinaryContentType = "application/vnd.microsoft.aspnetcore.endpoint-messages.v1+binary";
[Fact]
public async Task NegotiateReservesConnectionIdAndReturnsIt()
{
@ -62,6 +58,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var manager = CreateConnectionManager();
var dispatcher = new HttpConnectionDispatcher(manager, new LoggerFactory());
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
var services = new ServiceCollection();
services.AddEndPoint<TestEndPoint>();
services.AddOptions();
@ -96,6 +93,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var strm = new MemoryStream())
{
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
context.Response.Body = strm;
var services = new ServiceCollection();
@ -163,6 +161,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var strm = new MemoryStream())
{
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
context.Response.Body = strm;
var services = new ServiceCollection();
services.AddOptions();
@ -313,6 +312,28 @@ namespace Microsoft.AspNetCore.Sockets.Tests
Assert.False(exists);
}
[Fact]
public async Task LongPollingTimeoutSets200StatusCode()
{
var manager = CreateConnectionManager();
var connection = manager.CreateConnection();
var dispatcher = new HttpConnectionDispatcher(manager, new LoggerFactory());
var context = MakeRequest("/foo", connection);
var services = new ServiceCollection();
services.AddEndPoint<TestEndPoint>();
var builder = new SocketBuilder(services.BuildServiceProvider());
builder.UseEndPoint<TestEndPoint>();
var app = builder.Build();
var options = new HttpSocketOptions();
options.LongPolling.PollTimeout = TimeSpan.FromSeconds(2);
await dispatcher.ExecuteAsync(context, options, app).OrTimeout();
Assert.Equal(StatusCodes.Status200OK, context.Response.StatusCode);
}
[Fact]
public async Task WebSocketTransportTimesOutWhenCloseFrameNotReceived()
{
@ -648,6 +669,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var connection = manager.CreateConnection();
var dispatcher = new HttpConnectionDispatcher(manager, new LoggerFactory());
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
var services = new ServiceCollection();
services.AddOptions();
services.AddEndPoint<TestEndPoint>();
@ -696,6 +718,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var connection = manager.CreateConnection();
var dispatcher = new HttpConnectionDispatcher(manager, new LoggerFactory());
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
var services = new ServiceCollection();
services.AddOptions();
services.AddEndPoint<TestEndPoint>();
@ -740,6 +763,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
// reset HttpContext
context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
context.Request.Path = "/foo";
context.Request.Method = "GET";
context.RequestServices = sp;
@ -768,6 +792,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var connection = manager.CreateConnection();
var dispatcher = new HttpConnectionDispatcher(manager, new LoggerFactory());
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
var services = new ServiceCollection();
services.AddOptions();
services.AddEndPoint<TestEndPoint>();
@ -918,6 +943,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var strm = new MemoryStream())
{
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
context.Response.Body = strm;
var services = new ServiceCollection();
services.AddOptions();
@ -951,6 +977,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
private static DefaultHttpContext MakeRequest(string path, DefaultConnectionContext connection, string format = null)
{
var context = new DefaultHttpContext();
context.Features.Set<IHttpResponseFeature>(new ResponseFeature());
context.Request.Path = path;
context.Request.Method = "GET";
var values = new Dictionary<string, StringValues>();
@ -1039,4 +1066,16 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}
}
}
public class ResponseFeature : HttpResponseFeature
{
public override void OnCompleted(Func<object, Task> callback, object state)
{
}
public override void OnStarting(Func<object, Task> callback, object state)
{
}
}
}

View File

@ -3,12 +3,12 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Transports;
using Microsoft.Extensions.Logging;
using Xunit;
@ -22,7 +22,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
Assert.True(channel.Out.TryComplete());
@ -31,12 +31,29 @@ namespace Microsoft.AspNetCore.Sockets.Tests
Assert.Equal(204, context.Response.StatusCode);
}
[Fact]
public async Task Set200StatusCodeWhenTimeoutTokenFires()
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var timeoutToken = new CancellationToken(true);
var poll = new LongPollingTransport(timeoutToken, channel, new LoggerFactory());
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken, context.RequestAborted))
{
await poll.ProcessRequestAsync(context, cts.Token).OrTimeout();
Assert.Equal(0, context.Response.ContentLength);
Assert.Equal(200, context.Response.StatusCode);
}
}
[Fact]
public async Task FrameSentAsSingleResponse()
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;
@ -56,7 +73,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;