From 208299aa31ef78de7da8d7bf2b96e8b8d5a98786 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Sun, 12 May 2019 00:38:07 -0700 Subject: [PATCH] Synchronize writing to connection and aborting connection (#10043) --- .../server/Core/src/HubConnectionContext.cs | 70 ++++++++++++++----- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index bc061336ae..fff578ff06 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR private bool _receivedMessageThisInterval = false; private ReadOnlyMemory _cachedPingMessage; private bool _clientTimeoutActive; - private bool _connectedAborted; + private bool _connectionAborted; private int _streamBufferCapacity; /// @@ -138,17 +138,18 @@ namespace Microsoft.AspNetCore.SignalR public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default) { - if (_connectedAborted) - { - return default; - } - // Try to grab the lock synchronously, if we fail, go to the slower path if (!_writeLock.Wait(0)) { return new ValueTask(WriteSlowAsync(message)); } + if (_connectionAborted) + { + _writeLock.Release(); + return default; + } + // This method should never throw synchronously var task = WriteCore(message); @@ -173,17 +174,18 @@ namespace Microsoft.AspNetCore.SignalR /// public virtual ValueTask WriteAsync(SerializedHubMessage message, CancellationToken cancellationToken = default) { - if (_connectedAborted) - { - return default; - } - // Try to grab the lock synchronously, if we fail, go to the slower path if (!_writeLock.Wait(0)) { return new ValueTask(WriteSlowAsync(message)); } + if (_connectionAborted) + { + _writeLock.Release(); + return default; + } + // This method should never throw synchronously var task = WriteCore(message); @@ -259,10 +261,15 @@ namespace Microsoft.AspNetCore.SignalR private async Task WriteSlowAsync(HubMessage message) { + // Failed to get the lock immediately when entering WriteAsync so await until it is available await _writeLock.WaitAsync(); + try { - // Failed to get the lock immediately when entering WriteAsync so await until it is available + if (_connectionAborted) + { + return; + } await WriteCore(message); } @@ -279,10 +286,15 @@ namespace Microsoft.AspNetCore.SignalR private async Task WriteSlowAsync(SerializedHubMessage message) { + // Failed to get the lock immediately when entering WriteAsync so await until it is available + await _writeLock.WaitAsync(); + try { - // Failed to get the lock immediately when entering WriteAsync so await until it is available - await _writeLock.WaitAsync(); + if (_connectionAborted) + { + return; + } await WriteCore(message); } @@ -313,6 +325,11 @@ namespace Microsoft.AspNetCore.SignalR { try { + if (_connectionAborted) + { + return; + } + await _connectionContext.Transport.Output.WriteAsync(_cachedPingMessage); Log.SentPing(_logger); @@ -320,6 +337,7 @@ namespace Microsoft.AspNetCore.SignalR catch (Exception ex) { Log.FailedWritingMessage(_logger, ex); + Abort(); } finally { @@ -355,6 +373,8 @@ namespace Microsoft.AspNetCore.SignalR /// public virtual void Abort() { + _connectionAborted = true; + // If we already triggered the token then noop, this isn't thread safe but it's good enough // to avoid spawning a new task in the most common cases if (_connectionAbortedTokenSource.IsCancellationRequested) @@ -362,8 +382,6 @@ namespace Microsoft.AspNetCore.SignalR return; } - _connectedAborted = true; - Input.CancelPendingRead(); // We fire and forget since this can trigger user code to run @@ -551,6 +569,7 @@ namespace Microsoft.AspNetCore.SignalR private static void AbortConnection(object state) { var connection = (HubConnectionContext)state; + try { connection._connectionAbortedTokenSource.Cancel(); @@ -561,8 +580,23 @@ namespace Microsoft.AspNetCore.SignalR } finally { - // Communicate the fact that we're finished triggering abort callbacks - connection._abortCompletedTcs.TrySetResult(null); + _ = InnerAbortConnection(connection); + } + + static async Task InnerAbortConnection(HubConnectionContext connection) + { + // We lock to make sure all writes are done before triggering the completion of the pipe + await connection._writeLock.WaitAsync(); + try + { + // Communicate the fact that we're finished triggering abort callbacks + // HubOnDisconnectedAsync is waiting on this to complete the Pipe + connection._abortCompletedTcs.TrySetResult(null); + } + finally + { + connection._writeLock.Release(); + } } }