Add timeout to Event Queue drain (#619)

This commit is contained in:
Mikael Mengistu 2017-07-19 11:47:47 -07:00 committed by GitHub
parent 3010eaaee2
commit 8fc2cd98b6
3 changed files with 134 additions and 0 deletions

View File

@ -11,6 +11,7 @@ using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Features;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.AspNetCore.Sockets.Http.Internal;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -32,6 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private TaskQueue _eventQueue = new TaskQueue();
private readonly ITransportFactory _transportFactory;
private string _connectionId;
private readonly TimeSpan _eventQueueDrainTimeout = TimeSpan.FromSeconds(5);
private ReadableChannel<byte[]> Input => _transportChannel.In;
private WritableChannel<SendMessage> Output => _transportChannel.Out;
@ -173,6 +175,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger.DrainEvents(_connectionId);
await _eventQueue.Drain();
await Task.WhenAny(_eventQueue.Drain().NoThrow(), Task.Delay(_eventQueueDrainTimeout));
_httpClient.Dispose();
_logger.RaiseClosed(_connectionId);

View File

@ -0,0 +1,29 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Sockets.Http.Internal
{
public static class TaskExtensions
{
public static async Task NoThrow(this Task task)
{
await new NoThrowAwaiter(task);
}
}
internal struct NoThrowAwaiter : ICriticalNotifyCompletion
{
private readonly Task _task;
public NoThrowAwaiter(Task task) { _task = task; }
public NoThrowAwaiter GetAwaiter() => this;
public bool IsCompleted => _task.IsCompleted;
// Observe exception
public void GetResult() { _ = _task.Exception; }
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
}
}

View File

@ -433,6 +433,108 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await connection.DisposeAsync();
}
[Fact]
public async Task EventQueueTimeout()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return request.Method == HttpMethod.Options
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
var blockReceiveCallbackTcs = new TaskCompletionSource<object>();
var closedTcs = new TaskCompletionSource<object>();
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, httpMessageHandler: mockHttpHandler.Object);
connection.Received +=
async m =>
{
await blockReceiveCallbackTcs.Task;
};
connection.Closed += _ => {
closedTcs.SetResult(null);
return Task.CompletedTask;
};
await connection.StartAsync();
channel.Out.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.In.TryRead(out var message));
await connection.DisposeAsync();
}
[Fact]
public async Task EventQueueTimeoutWithException()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return request.Method == HttpMethod.Options
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
var callbackInvokedTcs = new TaskCompletionSource<object>();
var closedTcs = new TaskCompletionSource<object>();
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, httpMessageHandler: mockHttpHandler.Object);
connection.Received +=
m =>
{
throw new OperationCanceledException();
};
await connection.StartAsync();
channel.Out.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.In.TryRead(out var message));
await connection.DisposeAsync();
}
[Fact]
public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{