diff --git a/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs b/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs index 09a9491587..a185cd707b 100644 --- a/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs +++ b/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs @@ -348,12 +348,12 @@ namespace Microsoft.AspNetCore.SignalR.Protocol case HubProtocolConstants.StreamItemMessageType: if (itemToken != null) { - var returnType = binder.GetReturnType(invocationId); try { - item = itemToken.ToObject(returnType, PayloadSerializer); + var itemType = binder.GetStreamItemType(invocationId); + item = itemToken.ToObject(itemType, PayloadSerializer); } - catch (JsonSerializationException ex) + catch (Exception ex) { message = new StreamBindingFailureMessage(invocationId, ExceptionDispatchInfo.Capture(ex)); break; diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs index f532a94e27..da5df4d2fd 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs @@ -69,6 +69,12 @@ namespace Microsoft.AspNetCore.SignalR.Internal private static readonly Action _closingStreamWithBindingError = LoggerMessage.Define(LogLevel.Warning, new EventId(19, "ClosingStreamWithBindingError"), "Stream '{StreamId}' closed with error '{Error}'."); + private static readonly Action _unexpectedStreamCompletion = + LoggerMessage.Define(LogLevel.Debug, new EventId(20, "UnexpectedStreamCompletion"), "StreamCompletionMessage received unexpectedly."); + + private static readonly Action _unexpectedStreamItem = + LoggerMessage.Define(LogLevel.Debug, new EventId(21, "UnexpectedStreamItem"), "StreamItemMessage received unexpectedly."); + public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage) { _receivedHubInvocation(logger, invocationMessage, null); @@ -168,6 +174,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal { _closingStreamWithBindingError(logger, message.InvocationId, message.Error, null); } + + public static void UnexpectedStreamCompletion(ILogger logger) + { + _unexpectedStreamCompletion(logger, null); + } + + public static void UnexpectedStreamItem(ILogger logger) + { + _unexpectedStreamItem(logger, null); + } } } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index e137cc5998..01f470b0c7 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -130,14 +130,19 @@ namespace Microsoft.AspNetCore.SignalR.Internal break; case StreamItemMessage streamItem: - Log.ReceivedStreamItem(_logger, streamItem); return ProcessStreamItem(connection, streamItem); case CompletionMessage streamCompleteMessage: // closes channels, removes from Lookup dict // user's method can see the channel is complete and begin wrapping up - Log.CompletingStream(_logger, streamCompleteMessage); - connection.StreamTracker.Complete(streamCompleteMessage); + if (connection.StreamTracker.TryComplete(streamCompleteMessage)) + { + Log.CompletingStream(_logger, streamCompleteMessage); + } + else + { + Log.UnexpectedStreamCompletion(_logger); + } break; // Other kind of message we weren't expecting @@ -153,7 +158,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal { Log.FailedInvokingHubMethod(_logger, bindingFailureMessage.Target, bindingFailureMessage.BindingFailure.SourceException); - var errorMessage = ErrorMessageHelper.BuildErrorMessage($"Failed to invoke '{bindingFailureMessage.Target}' due to an error on the server.", bindingFailureMessage.BindingFailure.SourceException, _enableDetailedErrors); return SendInvocationError(bindingFailureMessage.InvocationId, connection, errorMessage); @@ -167,15 +171,25 @@ namespace Microsoft.AspNetCore.SignalR.Internal var message = CompletionMessage.WithError(bindingFailureMessage.Id, errorString); Log.ClosingStreamWithBindingError(_logger, message); - connection.StreamTracker.Complete(message); + + // ignore failure, it means the client already completed the stream or the stream never existed on the server + connection.StreamTracker.TryComplete(message); + + // TODO: Send stream completion message to client when we add it return Task.CompletedTask; } private Task ProcessStreamItem(HubConnectionContext connection, StreamItemMessage message) { + if (!connection.StreamTracker.TryProcessItem(message, out var processTask)) + { + Log.UnexpectedStreamItem(_logger); + return Task.CompletedTask; + } + Log.ReceivedStreamItem(_logger, message); - return connection.StreamTracker.ProcessItem(message); + return processTask; } private Task ProcessInvocation(HubConnectionContext connection, @@ -370,12 +384,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal { foreach (var stream in hubMessage.StreamIds) { - try - { - connection.StreamTracker.Complete(CompletionMessage.Empty(stream)); - } - // ignore failures, it means the client already completed the streams - catch (KeyNotFoundException) { } + connection.StreamTracker.TryComplete(CompletionMessage.Empty(stream)); } } diff --git a/src/SignalR/server/Core/src/StreamTracker.cs b/src/SignalR/server/Core/src/StreamTracker.cs index b9b7b1dd24..1445771046 100644 --- a/src/SignalR/server/Core/src/StreamTracker.cs +++ b/src/SignalR/server/Core/src/StreamTracker.cs @@ -28,36 +28,47 @@ namespace Microsoft.AspNetCore.SignalR return newConverter.GetReaderAsObject(); } - private IStreamConverter TryGetConverter(string streamId) + private bool TryGetConverter(string streamId, out IStreamConverter converter) { - if (_lookup.TryGetValue(streamId, out var converter)) + if (_lookup.TryGetValue(streamId, out converter)) { - return converter; - } - else - { - throw new KeyNotFoundException($"No stream with id '{streamId}' could be found."); + return true; } + + return false; } - public Task ProcessItem(StreamItemMessage message) + public bool TryProcessItem(StreamItemMessage message, out Task task) { - return TryGetConverter(message.InvocationId).WriteToStream(message.Item); + if (TryGetConverter(message.InvocationId, out var converter)) + { + task = converter.WriteToStream(message.Item); + return true; + } + + task = default; + return false; } public Type GetStreamItemType(string streamId) { - return TryGetConverter(streamId).GetItemType(); + if (TryGetConverter(streamId, out var converter)) + { + return converter.GetItemType(); + } + + throw new KeyNotFoundException($"No stream with id '{streamId}' could be found."); } - public void Complete(CompletionMessage message) + public bool TryComplete(CompletionMessage message) { _lookup.TryRemove(message.InvocationId, out var converter); if (converter == null) { - throw new KeyNotFoundException($"No stream with id '{message.InvocationId}' could be found."); + return false; } converter.TryComplete(message.HasResult || message.Error == null ? null : new Exception(message.Error)); + return true; } private static IStreamConverter BuildStream() diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 03c6438810..b51c47c229 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2918,13 +2918,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests [Fact] public async Task UploadStreamItemInvalidId() { - bool ExpectedErrors(WriteContext writeContext) - { - return writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.HubConnectionHandler" && - writeContext.EventId.Name == "ErrorProcessingRequest"; - } - - using (StartVerifiableLog(ExpectedErrors)) + using (StartVerifiableLog()) { var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => { @@ -2937,24 +2931,19 @@ namespace Microsoft.AspNetCore.SignalR.Tests var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); await client.SendHubMessageAsync(new StreamItemMessage("fake_id", "not a number")).OrTimeout(); - // Client is breaking protocol by sending an invalid id, and should be closed. var message = client.TryRead(); - Assert.IsType(message); - Assert.Equal("Connection closed with an error. KeyNotFoundException: No stream with id 'fake_id' could be found.", ((CloseMessage)message).Error); + Assert.Null(message); } } + + Assert.Single(TestSink.Writes.Where(w => w.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" && + w.EventId.Name == "ClosingStreamWithBindingError")); } [Fact] public async Task UploadStreamCompleteInvalidId() { - bool ExpectedErrors(WriteContext writeContext) - { - return writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.HubConnectionHandler" && - writeContext.EventId.Name == "ErrorProcessingRequest"; - } - - using (StartVerifiableLog(ExpectedErrors)) + using (StartVerifiableLog()) { var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => { @@ -2967,12 +2956,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); await client.SendHubMessageAsync(CompletionMessage.Empty("fake_id")).OrTimeout(); - // Client is breaking protocol by sending an invalid id, and should be closed. var message = client.TryRead(); - Assert.IsType(message); - Assert.Equal("Connection closed with an error. KeyNotFoundException: No stream with id 'fake_id' could be found.", ((CloseMessage)message).Error); + Assert.Null(message); } } + + Assert.Single(TestSink.Writes.Where(w => w.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" && + w.EventId.Name == "UnexpectedStreamCompletion")); } public static string CustomErrorMessage = "custom error for testing ::::)"; @@ -3088,20 +3078,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests [Fact] public async Task UploadStreamClosesStreamsOnServerWhenMethodCompletes() { - bool errorLogged = false; - bool ExpectedErrors(WriteContext writeContext) - { - if (writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.HubConnectionHandler" && - writeContext.EventId.Name == "ErrorProcessingRequest") - { - errorLogged = true; - return true; - } - - return false; - } - - using (StartVerifiableLog(ExpectedErrors)) + using (StartVerifiableLog()) { var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(loggerFactory: LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3118,9 +3095,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests var simpleCompletion = Assert.IsType(result); Assert.Null(simpleCompletion.Result); - // This will log an error on the server as the hub method has completed and will complete all associated streams + // This will log a warning on the server as the hub method has completed and will complete all associated streams await client.SendHubMessageAsync(new StreamItemMessage("id", "error!")).OrTimeout(); + // Check that the connection hasn't been closed + await client.SendInvocationAsync("VoidMethod").OrTimeout(); + // Shut down client.Dispose(); @@ -3128,27 +3108,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } - // Check that the stream has been completed by noting the existance of an error - Assert.True(errorLogged); + Assert.Single(TestSink.Writes.Where(w => w.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" && + w.EventId.Name == "ClosingStreamWithBindingError")); } [Fact] public async Task UploadStreamAndStreamingMethodClosesStreamsOnServerWhenMethodCompletes() { - bool errorLogged = false; - bool ExpectedErrors(WriteContext writeContext) - { - if (writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.HubConnectionHandler" && - writeContext.EventId.Name == "ErrorProcessingRequest") - { - errorLogged = true; - return true; - } - - return false; - } - - using (StartVerifiableLog(ExpectedErrors)) + using (StartVerifiableLog()) { var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(loggerFactory: LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3165,9 +3132,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests var simpleCompletion = Assert.IsType(result); Assert.Null(simpleCompletion.Result); - // This will log an error on the server as the hub method has completed and will complete all associated streams + // This will log a warning on the server as the hub method has completed and will complete all associated streams await client.SendHubMessageAsync(new StreamItemMessage("id", "error!")).OrTimeout(); + // Check that the connection hasn't been closed + await client.SendInvocationAsync("VoidMethod").OrTimeout(); + // Shut down client.Dispose(); @@ -3175,8 +3145,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } - // Check that the stream has been completed by noting the existance of an error - Assert.True(errorLogged); + Assert.Single(TestSink.Writes.Where(w => w.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" && + w.EventId.Name == "ClosingStreamWithBindingError")); } [Theory]