Complete upload streams on connection close (#13065)

This commit is contained in:
Brennan 2019-08-12 20:46:20 -07:00 committed by GitHub
parent 116e5bdd9a
commit eb966eb605
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 0 deletions

View File

@ -583,6 +583,12 @@ namespace Microsoft.AspNetCore.SignalR
finally
{
_ = InnerAbortConnection(connection);
// Use _streamTracker to avoid lazy init from StreamTracker getter if it doesn't exist
if (connection._streamTracker != null)
{
connection._streamTracker.CompleteAll(new OperationCanceledException("The underlying connection was closed."));
}
}
static async Task InnerAbortConnection(HubConnectionContext connection)

View File

@ -77,6 +77,14 @@ namespace Microsoft.AspNetCore.SignalR
return true;
}
public void CompleteAll(Exception ex)
{
foreach (var converter in _lookup)
{
converter.Value.TryComplete(ex);
}
}
private static IStreamConverter BuildStream<T>(int streamBufferCapacity)
{
return new ChannelConverter<T>(streamBufferCapacity);

View File

@ -298,6 +298,30 @@ namespace Microsoft.AspNetCore.SignalR.Tests
output.Complete();
}
}
public async Task UploadDoesWorkOnComplete(ChannelReader<string> source)
{
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
Context.Items[nameof(UploadDoesWorkOnComplete)] = tcs.Task;
try
{
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
}
}
}
catch (Exception ex)
{
tcs.SetException(ex);
}
finally
{
tcs.TrySetResult(42);
}
}
}
public abstract class TestHub : Hub

View File

@ -3705,6 +3705,37 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task ConnectionCloseCleansUploadStreams()
{
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider();
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
using (StartVerifiableLog())
{
using var client = new TestClient();
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
// Wait for a connection, or for the endpoint to fail.
await client.Connected.OrThrowIfOtherFails(connectionHandlerTask).OrTimeout();
await client.BeginUploadStreamAsync("invocation", nameof(MethodHub.UploadDoesWorkOnComplete), streamIds: new[] { "id" }, args: Array.Empty<object>()).OrTimeout();
await client.SendHubMessageAsync(new StreamItemMessage("id", "hello")).OrTimeout();
await client.DisposeAsync().OrTimeout();
await connectionHandlerTask.OrTimeout();
// This task completes if the upload stream is completed, via closing the connection
var task = (Task<int>)client.Connection.Items[nameof(MethodHub.UploadDoesWorkOnComplete)];
var exception = await Assert.ThrowsAsync<OperationCanceledException>(() => task).OrTimeout();
Assert.Equal("The underlying connection was closed.", exception.Message);
}
}
private class CustomHubActivator<THub> : IHubActivator<THub> where THub : Hub
{
public int ReleaseCount;