From eb966eb60581be3cda3670c866f7194755af8605 Mon Sep 17 00:00:00 2001 From: Brennan Date: Mon, 12 Aug 2019 20:46:20 -0700 Subject: [PATCH] Complete upload streams on connection close (#13065) --- .../server/Core/src/HubConnectionContext.cs | 6 ++++ src/SignalR/server/Core/src/StreamTracker.cs | 8 +++++ .../HubConnectionHandlerTestUtils/Hubs.cs | 24 ++++++++++++++ .../SignalR/test/HubConnectionHandlerTests.cs | 31 +++++++++++++++++++ 4 files changed, 69 insertions(+) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index cfc0137b36..90db2563a0 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -583,6 +583,12 @@ namespace Microsoft.AspNetCore.SignalR finally { _ = InnerAbortConnection(connection); + + // Use _streamTracker to avoid lazy init from StreamTracker getter if it doesn't exist + if (connection._streamTracker != null) + { + connection._streamTracker.CompleteAll(new OperationCanceledException("The underlying connection was closed.")); + } } static async Task InnerAbortConnection(HubConnectionContext connection) diff --git a/src/SignalR/server/Core/src/StreamTracker.cs b/src/SignalR/server/Core/src/StreamTracker.cs index fc4833abbf..0df957d54b 100644 --- a/src/SignalR/server/Core/src/StreamTracker.cs +++ b/src/SignalR/server/Core/src/StreamTracker.cs @@ -77,6 +77,14 @@ namespace Microsoft.AspNetCore.SignalR return true; } + public void CompleteAll(Exception ex) + { + foreach (var converter in _lookup) + { + converter.Value.TryComplete(ex); + } + } + private static IStreamConverter BuildStream(int streamBufferCapacity) { return new ChannelConverter(streamBufferCapacity); diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs index d8c1292d5a..450697b318 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs @@ -298,6 +298,30 @@ namespace Microsoft.AspNetCore.SignalR.Tests output.Complete(); } } + + public async Task UploadDoesWorkOnComplete(ChannelReader source) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + Context.Items[nameof(UploadDoesWorkOnComplete)] = tcs.Task; + + try + { + while (await source.WaitToReadAsync()) + { + while (source.TryRead(out var item)) + { + } + } + } + catch (Exception ex) + { + tcs.SetException(ex); + } + finally + { + tcs.TrySetResult(42); + } + } } public abstract class TestHub : Hub diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 389e760f6c..bc31d32c4c 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -3705,6 +3705,37 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Fact] + public async Task ConnectionCloseCleansUploadStreams() + { + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(); + var connectionHandler = serviceProvider.GetService>(); + + using (StartVerifiableLog()) + { + using var client = new TestClient(); + + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + // Wait for a connection, or for the endpoint to fail. + await client.Connected.OrThrowIfOtherFails(connectionHandlerTask).OrTimeout(); + + await client.BeginUploadStreamAsync("invocation", nameof(MethodHub.UploadDoesWorkOnComplete), streamIds: new[] { "id" }, args: Array.Empty()).OrTimeout(); + + await client.SendHubMessageAsync(new StreamItemMessage("id", "hello")).OrTimeout(); + + await client.DisposeAsync().OrTimeout(); + + await connectionHandlerTask.OrTimeout(); + + // This task completes if the upload stream is completed, via closing the connection + var task = (Task)client.Connection.Items[nameof(MethodHub.UploadDoesWorkOnComplete)]; + + var exception = await Assert.ThrowsAsync(() => task).OrTimeout(); + Assert.Equal("The underlying connection was closed.", exception.Message); + } + } + private class CustomHubActivator : IHubActivator where THub : Hub { public int ReleaseCount;