Delegate disposable to the IConnectionFactory (#1999)

- Added DisposeAsync to the IConnectionFactory. It's responsible for disposing the connection after the pipe has closed.
- Added dispose callback to WithConnectionFactory
- Don't wait for poll request to end before unwinding from the transport
- Make sure all http requests are done before returning from StopAsync in both SSE and longpolling
This commit is contained in:
David Fowler 2018-04-13 09:16:23 -07:00 committed by GitHub
parent abe139ee16
commit 3e69fdc4ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 135 additions and 41 deletions

View File

@ -56,6 +56,12 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature()); connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe; connection.Transport = _pipe;
return Task.FromResult<ConnectionContext>(connection); return Task.FromResult<ConnectionContext>(connection);
},
connection =>
{
connection.Transport.Output.Complete();
connection.Transport.Input.Complete();
return Task.CompletedTask;
}); });
_hubConnection = hubConnectionBuilder.Build(); _hubConnection = hubConnectionBuilder.Build();

View File

@ -46,6 +46,12 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature()); connection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestConnectionInherentKeepAliveFeature());
connection.Transport = _pipe; connection.Transport = _pipe;
return Task.FromResult<ConnectionContext>(connection); return Task.FromResult<ConnectionContext>(connection);
},
connection =>
{
connection.Transport.Output.Complete();
connection.Transport.Input.Complete();
return Task.CompletedTask;
}); });
_hubConnection = hubConnectionBuilder.Build(); _hubConnection = hubConnectionBuilder.Build();

View File

@ -31,7 +31,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, EndPoint endPoint) public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, EndPoint endPoint)
{ {
builder.WithConnectionFactory(format => new TcpConnection(endPoint).StartAsync()); builder.WithConnectionFactory(
format => new TcpConnection(endPoint).StartAsync(),
connection => ((TcpConnection)connection).DisposeAsync()
);
return builder; return builder;
} }

View File

@ -15,8 +15,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{ {
public partial class LongPollingTransport : ITransport public partial class LongPollingTransport : ITransport
{ {
private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
private readonly HttpClient _httpClient; private readonly HttpClient _httpClient;
private readonly ILogger _logger; private readonly ILogger _logger;
private IDuplexPipe _application; private IDuplexPipe _application;
@ -32,8 +30,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public PipeWriter Output => _transport.Output; public PipeWriter Output => _transport.Output;
internal TimeSpan ShutdownTimeout { get; set; }
public LongPollingTransport(HttpClient httpClient) public LongPollingTransport(HttpClient httpClient)
: this(httpClient, null) : this(httpClient, null)
{ } { }
@ -42,7 +38,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{ {
_httpClient = httpClient; _httpClient = httpClient;
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>(); _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
ShutdownTimeout = DefaultShutdownTimeout;
} }
public Task StartAsync(Uri url, TransferFormat transferFormat) public Task StartAsync(Uri url, TransferFormat transferFormat)
@ -85,6 +80,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// Cancel the application so that ReadAsync yields // Cancel the application so that ReadAsync yields
_application.Input.CancelPendingRead(); _application.Input.CancelPendingRead();
await sending;
} }
else else
{ {
@ -95,12 +92,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// This will also cause the poll to return. // This will also cause the poll to return.
await SendDeleteRequest(url); await SendDeleteRequest(url);
// This timeout is only to ensure the poll is cleaned up despite a misbehaving server. _transportCts.Cancel();
// It doesn't need to be configurable.
_transportCts.CancelAfter(ShutdownTimeout);
// Cancel any pending flush so that we can quit // Cancel any pending flush so that we can quit
_application.Output.CancelPendingFlush(); _application.Output.CancelPendingFlush();
await receiving;
} }
} }
@ -199,18 +196,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
} }
} }
private async Task SendDeleteRequest(Uri pollUrl) private async Task SendDeleteRequest(Uri url)
{ {
try try
{ {
Log.SendingDeleteRequest(_logger, pollUrl); Log.SendingDeleteRequest(_logger, url);
var response = await _httpClient.DeleteAsync(pollUrl); var response = await _httpClient.DeleteAsync(url);
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
Log.DeleteRequestAccepted(_logger, pollUrl); Log.DeleteRequestAccepted(_logger, url);
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.ErrorSendingDeleteRequest(_logger, pollUrl, ex); Log.ErrorSendingDeleteRequest(_logger, url, ex);
} }
} }
} }

View File

@ -101,6 +101,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// Cancel the application so that ReadAsync yields // Cancel the application so that ReadAsync yields
_application.Input.CancelPendingRead(); _application.Input.CancelPendingRead();
await sending;
} }
else else
{ {
@ -111,6 +113,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// Cancel any pending flush so that we can quit // Cancel any pending flush so that we can quit
_application.Output.CancelPendingFlush(); _application.Output.CancelPendingFlush();
await receiving;
} }
} }

View File

@ -145,7 +145,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.ErrorStartingConnection(_logger, ex); Log.ErrorStartingConnection(_logger, ex);
// Can't have any invocations to cancel, we're in the lock. // Can't have any invocations to cancel, we're in the lock.
Complete(startingConnectionState.Connection); await CloseAsync(startingConnectionState.Connection);
throw; throw;
} }
@ -160,10 +160,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
} }
} }
private static void Complete(ConnectionContext connection) private Task CloseAsync(ConnectionContext connection)
{ {
connection.Transport.Output.Complete(); return _connectionFactory.DisposeAsync(connection);
connection.Transport.Input.Complete();
} }
// This method does both Dispose and Start, the 'disposing' flag indicates which. // This method does both Dispose and Start, the 'disposing' flag indicates which.
@ -661,7 +660,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
timeoutTimer?.Dispose(); timeoutTimer?.Dispose();
// Dispose the connection // Dispose the connection
Complete(connectionState.Connection); await CloseAsync(connectionState.Connection);
// Cancel any outstanding invocations within the connection lock // Cancel any outstanding invocations within the connection lock
connectionState.CancelOutstandingInvocations(connectionState.CloseException); connectionState.CancelOutstandingInvocations(connectionState.CloseException);

View File

@ -13,13 +13,15 @@ namespace Microsoft.AspNetCore.SignalR.Client
{ {
public static class HubConnectionBuilderExtensions public static class HubConnectionBuilderExtensions
{ {
public static IHubConnectionBuilder WithConnectionFactory(this IHubConnectionBuilder hubConnectionBuilder, Func<TransferFormat, Task<ConnectionContext>> connectionFactory) public static IHubConnectionBuilder WithConnectionFactory(this IHubConnectionBuilder hubConnectionBuilder,
Func<TransferFormat, Task<ConnectionContext>> connectionFactory,
Func<ConnectionContext, Task> disposeCallback)
{ {
if (connectionFactory == null) if (connectionFactory == null)
{ {
throw new ArgumentNullException(nameof(connectionFactory)); throw new ArgumentNullException(nameof(connectionFactory));
} }
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(new DelegateConnectionFactory(connectionFactory)); hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(new DelegateConnectionFactory(connectionFactory, disposeCallback));
return hubConnectionBuilder; return hubConnectionBuilder;
} }
@ -38,16 +40,23 @@ namespace Microsoft.AspNetCore.SignalR.Client
private class DelegateConnectionFactory : IConnectionFactory private class DelegateConnectionFactory : IConnectionFactory
{ {
private readonly Func<TransferFormat, Task<ConnectionContext>> _connectionFactory; private readonly Func<TransferFormat, Task<ConnectionContext>> _connectionFactory;
private readonly Func<ConnectionContext, Task> _disposeCallback;
public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectionFactory) public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectionFactory, Func<ConnectionContext, Task> disposeCallback)
{ {
_connectionFactory = connectionFactory; _connectionFactory = connectionFactory;
_disposeCallback = disposeCallback;
} }
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat) public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{ {
return _connectionFactory(transferFormat); return _connectionFactory(transferFormat);
} }
public Task DisposeAsync(ConnectionContext connection)
{
return _disposeCallback(connection);
}
} }
} }
} }

View File

@ -9,5 +9,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public interface IConnectionFactory public interface IConnectionFactory
{ {
Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat); Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat);
Task DisposeAsync(ConnectionContext connection);
} }
} }

View File

@ -37,5 +37,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
await connection.StartAsync(transferFormat); await connection.StartAsync(transferFormat);
return connection; return connection;
} }
public Task DisposeAsync(ConnectionContext connection)
{
return ((HttpConnection)connection).DisposeAsync();
}
} }
} }

View File

@ -51,7 +51,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var hubConnectionBuilder = new HubConnectionBuilder(); var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(protocol); hubConnectionBuilder.WithHubProtocol(protocol);
hubConnectionBuilder.WithLoggerFactory(loggerFactory); hubConnectionBuilder.WithLoggerFactory(loggerFactory);
hubConnectionBuilder.WithConnectionFactory(GetHttpConnectionFactory(loggerFactory, path, transportType ?? HttpTransportType.LongPolling | HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents)); hubConnectionBuilder.WithConnectionFactory(GetHttpConnectionFactory(loggerFactory, path, transportType ?? HttpTransportType.LongPolling | HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents),
connection => ((HttpConnection)connection).DisposeAsync());
return hubConnectionBuilder.Build(); return hubConnectionBuilder.Build();
} }
@ -896,8 +897,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var stopTask = hubConnection.StopAsync(); var stopTask = hubConnection.StopAsync();
// Stop async and wait for the poll to shut down. It should do so very quickly because the DELETE will stop the poll! try
await pollTracker.ActivePoll.OrTimeout(TimeSpan.FromMilliseconds(100)); {
// if we completed running before the poll or after the poll started then the task
// might complete successfully
await pollTracker.ActivePoll.OrTimeout();
}
catch (OperationCanceledException)
{
// If this happens it's fine because we were in the middle of a poll
}
await stopTask; await stopTask;
} }

View File

@ -55,7 +55,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
return Task.FromResult<ConnectionContext>(null); return Task.FromResult<ConnectionContext>(null);
}; };
var serviceProvider = new HubConnectionBuilder().WithConnectionFactory(connectionFactory).Services.BuildServiceProvider(); var serviceProvider = new HubConnectionBuilder().WithConnectionFactory(connectionFactory, connection => Task.CompletedTask).Services.BuildServiceProvider();
var factory = serviceProvider.GetService<IConnectionFactory>(); var factory = serviceProvider.GetService<IConnectionFactory>();
Assert.NotNull(factory); Assert.NotNull(factory);
@ -67,7 +67,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact] [Fact]
public void BuildCanOnlyBeCalledOnce() public void BuildCanOnlyBeCalledOnce()
{ {
var builder = new HubConnectionBuilder().WithConnectionFactory(format => null); var builder = new HubConnectionBuilder().WithConnectionFactory(format => null, connection => Task.CompletedTask);
Assert.NotNull(builder.Build()); Assert.NotNull(builder.Build());

View File

@ -27,14 +27,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private HubConnection CreateHubConnection(TestConnection testConnection) private HubConnection CreateHubConnection(TestConnection testConnection)
{ {
var builder = new HubConnectionBuilder(); var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => testConnection.StartAsync(format)); builder.WithConnectionFactory(format => testConnection.StartAsync(format), connection => ((TestConnection)connection).DisposeAsync());
return builder.Build(); return builder.Build();
} }
private HubConnection CreateHubConnection(Func<TransferFormat, Task<ConnectionContext>> connectionFactory) private HubConnection CreateHubConnection(Func<TransferFormat, Task<ConnectionContext>> connectionFactory, Func<ConnectionContext, Task> disposeCallback)
{ {
var builder = new HubConnectionBuilder(); var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => connectionFactory(format)); builder.WithConnectionFactory(format => connectionFactory(format), disposeCallback);
return builder.Build(); return builder.Build();
} }
@ -86,7 +86,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
return new TestConnection().StartAsync(format); return new TestConnection().StartAsync(format);
} }
await AsyncUsing(CreateHubConnection(ConnectionFactory), async connection => Task DisposeAsync(ConnectionContext connection)
{
return ((TestConnection)connection).DisposeAsync();
}
await AsyncUsing(CreateHubConnection(ConnectionFactory, DisposeAsync), async connection =>
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
Assert.Equal(1, createCount); Assert.Equal(1, createCount);
@ -97,6 +102,41 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}); });
} }
[Fact]
public async Task StartingDuringStopCreatesANewConnection()
{
// Set up StartAsync to wait on the syncPoint when starting
var createCount = 0;
var onDisposeForFirstConnection = SyncPoint.Create(out var syncPoint);
Task<ConnectionContext> ConnectionFactory(TransferFormat format)
{
createCount += 1;
return new TestConnection(onDispose: createCount == 1 ? onDisposeForFirstConnection : null).StartAsync(format);
}
Task DisposeAsync(ConnectionContext connection) => ((TestConnection)connection).DisposeAsync();
await AsyncUsing(CreateHubConnection(ConnectionFactory, DisposeAsync), async connection =>
{
await connection.StartAsync().OrTimeout();
Assert.Equal(1, createCount);
var stopTask = connection.StopAsync().OrTimeout();
// Wait to hit DisposeAsync on TestConnection (which should be after StopAsync has cleared the connection state)
await syncPoint.WaitForSyncPoint();
// We should be able to start now, and StopAsync hasn't completed, nor will it complete while Starting
Assert.False(stopTask.IsCompleted);
await connection.StartAsync().OrTimeout();
Assert.False(stopTask.IsCompleted);
// When we release the sync point, the StopAsync task will finish
syncPoint.Continue();
await stopTask;
});
}
[Fact] [Fact]
public async Task StartAsyncWithFailedHandshakeCanBeStopped() public async Task StartAsyncWithFailedHandshakeCanBeStopped()
{ {

View File

@ -11,7 +11,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await connection.StartAsync(format); await connection.StartAsync(format);
return connection; return connection;
}); },
connecton => ((TestConnection)connection).DisposeAsync());
if (protocol != null) if (protocol != null)
{ {
builder.WithHubProtocol(protocol); builder.WithHubProtocol(protocol);

View File

@ -41,7 +41,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task ClosedEventRaisedWhenTheClientIsStopped() public async Task ClosedEventRaisedWhenTheClientIsStopped()
{ {
var builder = new HubConnectionBuilder(); var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => new TestConnection().StartAsync(format)); builder.WithConnectionFactory(format => new TestConnection().StartAsync(format),
connection => ((TestConnection)connection).DisposeAsync());
var hubConnection = builder.Build(); var hubConnection = builder.Build();
var closedEventTcs = new TaskCompletionSource<Exception>(); var closedEventTcs = new TaskCompletionSource<Exception>();

View File

@ -277,7 +277,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
} }
[Fact] [Fact]
public async Task LongPollingTransportShutsDownAfterTimeoutEvenIfServerDoesntCompletePoll() public async Task LongPollingTransportShutsDownImmediatelyEvenIfServerDoesntCompletePoll()
{ {
var mockHttpHandler = new Mock<HttpMessageHandler>(); var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected() mockHttpHandler.Protected()
@ -291,7 +291,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
using (var httpClient = new HttpClient(mockHttpHandler.Object)) using (var httpClient = new HttpClient(mockHttpHandler.Object))
{ {
var longPollingTransport = new LongPollingTransport(httpClient); var longPollingTransport = new LongPollingTransport(httpClient);
longPollingTransport.ShutdownTimeout = TimeSpan.FromMilliseconds(1);
try try
{ {
@ -366,6 +365,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task LongPollingTransportSendsAvailableMessagesWhenTheyArrive() public async Task LongPollingTransportSendsAvailableMessagesWhenTheyArrive()
{ {
var sentRequests = new List<byte[]>(); var sentRequests = new List<byte[]>();
var tcs = new TaskCompletionSource<HttpResponseMessage>();
var mockHttpHandler = new Mock<HttpMessageHandler>(); var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected() mockHttpHandler.Protected()
@ -378,12 +378,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// Build a new request object, but convert the entire payload to string // Build a new request object, but convert the entire payload to string
sentRequests.Add(await request.Content.ReadAsByteArrayAsync()); sentRequests.Add(await request.Content.ReadAsByteArrayAsync());
} }
else if (request.Method == HttpMethod.Get)
{
// This is the poll task
return await tcs.Task;
}
else if (request.Method == HttpMethod.Delete)
{
tcs.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent));
}
return ResponseUtils.CreateResponse(HttpStatusCode.OK); return ResponseUtils.CreateResponse(HttpStatusCode.OK);
}); });
using (var httpClient = new HttpClient(mockHttpHandler.Object)) using (var httpClient = new HttpClient(mockHttpHandler.Object))
{ {
var longPollingTransport = new LongPollingTransport(httpClient); var longPollingTransport = new LongPollingTransport(httpClient);
try try
{ {
// Start the transport // Start the transport

View File

@ -58,8 +58,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Application.Input.OnWriterCompleted((ex, _) => Application.Input.OnWriterCompleted((ex, _) =>
{ {
Application.Output.Complete(); Application.Output.Complete();
_ = DisposeAsync();
}, },
null); null);
} }

View File

@ -76,10 +76,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
[MemberData(nameof(TransportTypes))] [MemberData(nameof(TransportTypes))]
public async Task CanStartAndStopConnectionUsingGivenTransport(HttpTransportType transportType) public async Task CanStartAndStopConnectionUsingGivenTransport(HttpTransportType transportType)
{ {
var url = _serverFixture.Url + "/echo"; using (StartLog(out var loggerFactory))
var connection = new HttpConnection(new Uri(url), transportType); {
await connection.StartAsync(TransferFormat.Text).OrTimeout(); var url = _serverFixture.Url + "/echo";
await connection.DisposeAsync().OrTimeout(); var connection = new HttpConnection(new Uri(url), transportType, loggerFactory);
await connection.StartAsync(TransferFormat.Text).OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
} }
[ConditionalFact] [ConditionalFact]