Synchronize writing to connection and aborting connection (#10043)

This commit is contained in:
BrennanConroy 2019-05-12 00:38:07 -07:00 committed by GitHub
parent 8a7af8b493
commit 208299aa31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 18 deletions

View File

@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR
private bool _receivedMessageThisInterval = false;
private ReadOnlyMemory<byte> _cachedPingMessage;
private bool _clientTimeoutActive;
private bool _connectedAborted;
private bool _connectionAborted;
private int _streamBufferCapacity;
/// <summary>
@ -138,17 +138,18 @@ namespace Microsoft.AspNetCore.SignalR
public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default)
{
if (_connectedAborted)
{
return default;
}
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))
{
return new ValueTask(WriteSlowAsync(message));
}
if (_connectionAborted)
{
_writeLock.Release();
return default;
}
// This method should never throw synchronously
var task = WriteCore(message);
@ -173,17 +174,18 @@ namespace Microsoft.AspNetCore.SignalR
/// <returns></returns>
public virtual ValueTask WriteAsync(SerializedHubMessage message, CancellationToken cancellationToken = default)
{
if (_connectedAborted)
{
return default;
}
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))
{
return new ValueTask(WriteSlowAsync(message));
}
if (_connectionAborted)
{
_writeLock.Release();
return default;
}
// This method should never throw synchronously
var task = WriteCore(message);
@ -259,10 +261,15 @@ namespace Microsoft.AspNetCore.SignalR
private async Task WriteSlowAsync(HubMessage message)
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
try
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
if (_connectionAborted)
{
return;
}
await WriteCore(message);
}
@ -279,10 +286,15 @@ namespace Microsoft.AspNetCore.SignalR
private async Task WriteSlowAsync(SerializedHubMessage message)
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
try
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
if (_connectionAborted)
{
return;
}
await WriteCore(message);
}
@ -313,6 +325,11 @@ namespace Microsoft.AspNetCore.SignalR
{
try
{
if (_connectionAborted)
{
return;
}
await _connectionContext.Transport.Output.WriteAsync(_cachedPingMessage);
Log.SentPing(_logger);
@ -320,6 +337,7 @@ namespace Microsoft.AspNetCore.SignalR
catch (Exception ex)
{
Log.FailedWritingMessage(_logger, ex);
Abort();
}
finally
{
@ -355,6 +373,8 @@ namespace Microsoft.AspNetCore.SignalR
/// </summary>
public virtual void Abort()
{
_connectionAborted = true;
// If we already triggered the token then noop, this isn't thread safe but it's good enough
// to avoid spawning a new task in the most common cases
if (_connectionAbortedTokenSource.IsCancellationRequested)
@ -362,8 +382,6 @@ namespace Microsoft.AspNetCore.SignalR
return;
}
_connectedAborted = true;
Input.CancelPendingRead();
// We fire and forget since this can trigger user code to run
@ -551,6 +569,7 @@ namespace Microsoft.AspNetCore.SignalR
private static void AbortConnection(object state)
{
var connection = (HubConnectionContext)state;
try
{
connection._connectionAbortedTokenSource.Cancel();
@ -561,8 +580,23 @@ namespace Microsoft.AspNetCore.SignalR
}
finally
{
// Communicate the fact that we're finished triggering abort callbacks
connection._abortCompletedTcs.TrySetResult(null);
_ = InnerAbortConnection(connection);
}
static async Task InnerAbortConnection(HubConnectionContext connection)
{
// We lock to make sure all writes are done before triggering the completion of the pipe
await connection._writeLock.WaitAsync();
try
{
// Communicate the fact that we're finished triggering abort callbacks
// HubOnDisconnectedAsync is waiting on this to complete the Pipe
connection._abortCompletedTcs.TrySetResult(null);
}
finally
{
connection._writeLock.Release();
}
}
}