Merged PR 4506: [SignalR] Wait to complete pipe and cancel long sends

This commit is contained in:
Brennan Conroy 2019-11-25 19:37:46 +00:00 committed by Andrew Stanton-Nurse
parent bc1b9f020d
commit a0183b1fac
24 changed files with 940 additions and 120 deletions

View File

@ -62,7 +62,7 @@ variables:
- name: _BuildArgs
value: ''
- name: _SignType
valule: test
value: test
- name: _PublishArgs
value: ''
# used for post-build phases, internal builds only

View File

@ -2,7 +2,7 @@
<Project>
<PropertyGroup>
<MSBuildAllProjects>$(MSBuildAllProjects);$(MSBuildThisFileFullPath)</MSBuildAllProjects>
<AspNetCoreBaselineVersion>3.0.0</AspNetCoreBaselineVersion>
<AspNetCoreBaselineVersion>3.0.1</AspNetCoreBaselineVersion>
</PropertyGroup>
<!-- Package: AspNetCoreRuntime.3.0.x64-->
<PropertyGroup Condition=" '$(PackageId)' == 'AspNetCoreRuntime.3.0.x64' ">
@ -35,7 +35,7 @@
</ItemGroup>
<!-- Package: Microsoft.AspNetCore.App.Runtime.win-x64-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.App.Runtime.win-x64' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<!-- Package: Microsoft.AspNetCore.Authentication.AzureAD.UI-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Authentication.AzureAD.UI' ">
@ -407,14 +407,14 @@
</ItemGroup>
<!-- Package: Microsoft.AspNetCore.Http.Features-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Features' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<ItemGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Features' AND '$(TargetFramework)' == 'netcoreapp3.0' ">
<BaselinePackageReference Include="Microsoft.Extensions.Primitives" Version="[3.0.0, )" />
<BaselinePackageReference Include="Microsoft.Extensions.Primitives" Version="[3.0.1, )" />
<BaselinePackageReference Include="System.IO.Pipelines" Version="[4.6.0, )" />
</ItemGroup>
<ItemGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Features' AND '$(TargetFramework)' == 'netstandard2.0' ">
<BaselinePackageReference Include="Microsoft.Extensions.Primitives" Version="[3.0.0, )" />
<BaselinePackageReference Include="Microsoft.Extensions.Primitives" Version="[3.0.1, )" />
<BaselinePackageReference Include="System.IO.Pipelines" Version="[4.6.0, )" />
</ItemGroup>
<!-- Package: Microsoft.AspNetCore.Identity.EntityFrameworkCore-->
@ -635,19 +635,19 @@
</PropertyGroup>
<!-- Package: Microsoft.DotNet.Web.Client.ItemTemplates-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.DotNet.Web.Client.ItemTemplates' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<!-- Package: Microsoft.DotNet.Web.ItemTemplates-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.DotNet.Web.ItemTemplates' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<!-- Package: Microsoft.DotNet.Web.ProjectTemplates.3.0-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.DotNet.Web.ProjectTemplates.3.0' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<!-- Package: Microsoft.DotNet.Web.Spa.ProjectTemplates.3.0-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.DotNet.Web.Spa.ProjectTemplates.3.0' ">
<BaselinePackageVersion>3.0.0</BaselinePackageVersion>
<BaselinePackageVersion>3.0.1</BaselinePackageVersion>
</PropertyGroup>
<!-- Package: Microsoft.Extensions.ApiDescription.Client-->
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.Extensions.ApiDescription.Client' ">

View File

@ -1,16 +1,15 @@
<!--
This file contains a list of all the packages and their versions which were released in ASP.NET Core 3.0.0.
This file contains a list of all the packages and their versions which were released in ASP.NET Core 3.0.0.
Update this list when preparing for a new patch.
-->
<Baseline Version="3.0.0">
<Baseline Version="3.0.1">
<Package Id="AspNetCoreRuntime.3.0.x64" Version="3.0.0" />
<Package Id="AspNetCoreRuntime.3.0.x86" Version="3.0.0" />
<Package Id="dotnet-sql-cache" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.ApiAuthorization.IdentityServer" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.App.Runtime.win-x64" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.App.Runtime.win-x64" Version="3.0.1" />
<Package Id="Microsoft.AspNetCore.Authentication.AzureAD.UI" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Authentication.AzureADB2C.UI" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Authentication.Certificate" Version="3.0.0" />
@ -53,7 +52,7 @@ Update this list when preparing for a new patch.
<Package Id="Microsoft.AspNetCore.Hosting.WindowsServices" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Http.Connections.Client" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Http.Connections.Common" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Http.Features" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Http.Features" Version="3.0.1" />
<Package Id="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Identity.Specification.Tests" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.Identity.UI" Version="3.0.0" />
@ -78,10 +77,10 @@ Update this list when preparing for a new patch.
<Package Id="Microsoft.AspNetCore.SpaServices.Extensions" Version="3.0.0" />
<Package Id="Microsoft.AspNetCore.TestHost" Version="3.0.0" />
<Package Id="Microsoft.dotnet-openapi" Version="3.0.0" />
<Package Id="Microsoft.DotNet.Web.Client.ItemTemplates" Version="3.0.0" />
<Package Id="Microsoft.DotNet.Web.ItemTemplates" Version="3.0.0" />
<Package Id="Microsoft.DotNet.Web.ProjectTemplates.3.0" Version="3.0.0" />
<Package Id="Microsoft.DotNet.Web.Spa.ProjectTemplates.3.0" Version="3.0.0" />
<Package Id="Microsoft.DotNet.Web.Client.ItemTemplates" Version="3.0.1" />
<Package Id="Microsoft.DotNet.Web.ItemTemplates" Version="3.0.1" />
<Package Id="Microsoft.DotNet.Web.ProjectTemplates.3.0" Version="3.0.1" />
<Package Id="Microsoft.DotNet.Web.Spa.ProjectTemplates.3.0" Version="3.0.1" />
<Package Id="Microsoft.Extensions.ApiDescription.Client" Version="3.0.0" />
<Package Id="Microsoft.Extensions.ApiDescription.Server" Version="3.0.0" />
<Package Id="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="3.0.0" />

View File

@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
IHttpTransportFeature,
IConnectionInherentKeepAliveFeature
{
private static long _tenSeconds = TimeSpan.FromSeconds(10).Ticks;
private readonly object _stateLock = new object();
private readonly object _itemsLock = new object();
private readonly object _heartbeatLock = new object();
@ -40,6 +42,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
private IDuplexPipe _application;
private IDictionary<object, object> _items;
private CancellationTokenSource _sendCts;
private bool _activeSend;
private long _startedSendTime;
private readonly object _sendingLock = new object();
internal CancellationToken SendingToken { get; private set; }
// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
// on the same task
private readonly TaskCompletionSource<object> _disposeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -258,8 +266,26 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
}
else
{
// The other transports don't close their own output, so we can do it here safely
Application?.Output.Complete();
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
// It is safe to wait for this lock now because the Send will be in one of 4 states
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
// throw an InvalidOperationException if they call Write
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
// 4. No Send in progress
await WriteLock.WaitAsync();
try
{
// Complete the applications read loop
Application?.Output.Complete();
}
finally
{
WriteLock.Release();
}
Application?.Input.CancelPendingRead();
}
}
@ -401,7 +427,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
nonClonedContext.Response.RegisterForDispose(timeoutSource);
nonClonedContext.Response.RegisterForDispose(tokenSource);
var longPolling = new LongPollingServerTransport(timeoutSource.Token, Application.Input, loggerFactory);
var longPolling = new LongPollingServerTransport(timeoutSource.Token, Application.Input, loggerFactory, this);
// Start the transport
TransportTask = longPolling.ProcessRequestAsync(nonClonedContext, tokenSource.Token);
@ -507,6 +533,40 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
await connectionDelegate(this);
}
internal void StartSendCancellation()
{
lock (_sendingLock)
{
if (_sendCts == null || _sendCts.IsCancellationRequested)
{
_sendCts = new CancellationTokenSource();
SendingToken = _sendCts.Token;
}
_startedSendTime = DateTime.UtcNow.Ticks;
_activeSend = true;
}
}
internal void TryCancelSend(long currentTicks)
{
lock (_sendingLock)
{
if (_activeSend)
{
if (currentTicks - _startedSendTime > _tenSeconds)
{
_sendCts.Cancel();
}
}
}
}
internal void StopSendCancellation()
{
lock (_sendingLock)
{
_activeSend = false;
}
}
private static class Log
{
private static readonly Action<ILogger, string, Exception> _disposingConnection =

View File

@ -142,7 +142,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
connection.SupportedFormats = TransferFormat.Text;
// We only need to provide the Input channel since writing to the application is handled through /send.
var sse = new ServerSentEventsServerTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
var sse = new ServerSentEventsServerTransport(connection.Application.Input, connection.ConnectionId, connection, _loggerFactory);
await DoPersistentConnection(connectionDelegate, sse, context, connection);
}
@ -216,7 +216,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
connection.Transport.Output.Complete(connection.ApplicationTask.Exception);
// Wait for the transport to run
await connection.TransportTask;
// Ignore exceptions, it has been logged if there is one and the application has finished
// So there is no one to give the exception to
await connection.TransportTask.NoThrow();
// If the status code is a 204 it means the connection is done
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
@ -234,12 +236,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
connection.MarkInactive();
}
}
else if (resultTask.IsFaulted)
else if (resultTask.IsFaulted || resultTask.IsCanceled)
{
// Cancel current request to release any waiting poll and let dispose acquire the lock
currentRequestTcs.TrySetCanceled();
// transport task was faulted, we should remove the connection
// We should be able to safely dispose because there's no more data being written
// We don't need to wait for close here since we've already waited for both sides
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
}
else
@ -434,6 +436,14 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
context.Response.StatusCode = StatusCodes.Status404NotFound;
context.Response.ContentType = "text/plain";
// There are no writes anymore (since this is the write "loop")
// So it is safe to complete the writer
// We complete the writer here because we already have the WriteLock acquired
// and it's unsafe to complete outside of the lock
// Other code isn't guaranteed to be able to acquire the lock before another write
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
connection.Application.Output.Complete();
return;
}
catch (IOException ex)
@ -481,11 +491,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
Log.TerminatingConection(_logger);
// Complete the receiving end of the pipe
connection.Application.Output.Complete();
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
context.Response.StatusCode = StatusCodes.Status202Accepted;
context.Response.ContentType = "text/plain";

View File

@ -31,6 +31,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
private readonly TimerAwaitable _nextHeartbeat;
private readonly ILogger<HttpConnectionManager> _logger;
private readonly ILogger<HttpConnectionContext> _connectionLogger;
private readonly bool _useSendTimeout = true;
private readonly TimeSpan _disconnectTimeout;
public HttpConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifetime appLifetime)
@ -44,6 +45,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
_connectionLogger = loggerFactory.CreateLogger<HttpConnectionContext>();
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);
_disconnectTimeout = connectionOptions.Value.DisconnectTimeout ?? ConnectionOptionsSetup.DefaultDisconectTimeout;
if (AppContext.TryGetSwitch("Microsoft.AspNetCore.Http.Connections.DoNotUseSendTimeout", out var timeoutDisabled))
{
_useSendTimeout = !timeoutDisabled;
}
// Register these last as the callbacks could run immediately
appLifetime.ApplicationStarted.Register(() => Start());
appLifetime.ApplicationStopping.Register(() => CloseConnections());
@ -155,20 +161,26 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
// Capture the connection state
var lastSeenUtc = connection.LastSeenUtcIfInactive;
var utcNow = DateTimeOffset.UtcNow;
// Once the decision has been made to dispose we don't check the status again
// But don't clean up connections while the debugger is attached.
if (!Debugger.IsAttached && lastSeenUtc.HasValue && (DateTimeOffset.UtcNow - lastSeenUtc.Value).TotalSeconds > _disconnectTimeout.TotalSeconds)
if (!Debugger.IsAttached && lastSeenUtc.HasValue && (utcNow - lastSeenUtc.Value).TotalSeconds > _disconnectTimeout.TotalSeconds)
{
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
// This is most likely a long polling connection. The transport here ends because
// a poll completed and has been inactive for > 5 seconds so we wait for the
// a poll completed and has been inactive for > 5 seconds so we wait for the
// application to finish gracefully
_ = DisposeAndRemoveAsync(connection, closeGracefully: true);
}
else
{
if (!Debugger.IsAttached && _useSendTimeout)
{
connection.TryCancelSend(utcNow.Ticks);
}
// Tick the heartbeat, if the connection is still active
connection.TickHeartbeat();
}

View File

@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
namespace System.Threading.Tasks
{
internal static class TaskExtensions
{
public static async Task NoThrow(this Task task)
{
await new NoThrowAwaiter(task);
}
}
internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
{
private readonly Task _task;
public NoThrowAwaiter(Task task) { _task = task; }
public NoThrowAwaiter GetAwaiter() => this;
public bool IsCompleted => _task.IsCompleted;
// Observe exception
public void GetResult() { _ = _task.Exception; }
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
}
}

View File

@ -16,12 +16,19 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
private readonly PipeReader _application;
private readonly ILogger _logger;
private readonly CancellationToken _timeoutToken;
private readonly HttpConnectionContext _connection;
public LongPollingServerTransport(CancellationToken timeoutToken, PipeReader application, ILoggerFactory loggerFactory)
: this(timeoutToken, application, loggerFactory, connection: null)
{ }
public LongPollingServerTransport(CancellationToken timeoutToken, PipeReader application, ILoggerFactory loggerFactory, HttpConnectionContext connection)
{
_timeoutToken = timeoutToken;
_application = application;
_connection = connection;
// We create the logger with a string to preserve the logging namespace after the server side transport renames.
_logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Http.Connections.Internal.Transports.LongPollingTransport");
}
@ -33,7 +40,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
var result = await _application.ReadAsync(token);
var buffer = result.Buffer;
if (buffer.IsEmpty && result.IsCompleted)
if (buffer.IsEmpty && (result.IsCompleted || result.IsCanceled))
{
Log.LongPolling204(_logger);
context.Response.ContentType = "text/plain";
@ -51,19 +58,22 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
try
{
await context.Response.Body.WriteAsync(buffer);
_connection?.StartSendCancellation();
await context.Response.Body.WriteAsync(buffer, _connection?.SendingToken ?? default);
}
finally
{
_connection?.StopSendCancellation();
_application.AdvanceTo(buffer.End);
}
}
catch (OperationCanceledException)
{
// 3 cases:
// 4 cases:
// 1 - Request aborted, the client disconnected (no response)
// 2 - The poll timeout is hit (200)
// 3 - A new request comes in and cancels this request (204)
// 3 - SendingToken was canceled, abort the connection
// 4 - A new request comes in and cancels this request (204)
// Case 1
if (context.RequestAborted.IsCancellationRequested)
@ -81,9 +91,16 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
context.Response.ContentType = "text/plain";
context.Response.StatusCode = StatusCodes.Status200OK;
}
else
else if (_connection?.SendingToken.IsCancellationRequested == true)
{
// Case 3
context.Response.ContentType = "text/plain";
context.Response.StatusCode = StatusCodes.Status204NoContent;
throw;
}
else
{
// Case 4
Log.LongPolling204(_logger);
context.Response.ContentType = "text/plain";
context.Response.StatusCode = StatusCodes.Status204NoContent;

View File

@ -16,11 +16,17 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
private readonly PipeReader _application;
private readonly string _connectionId;
private readonly ILogger _logger;
private readonly HttpConnectionContext _connection;
public ServerSentEventsServerTransport(PipeReader application, string connectionId, ILoggerFactory loggerFactory)
: this(application, connectionId, connection: null, loggerFactory)
{ }
public ServerSentEventsServerTransport(PipeReader application, string connectionId, HttpConnectionContext connection, ILoggerFactory loggerFactory)
{
_application = application;
_connectionId = connectionId;
_connection = connection;
// We create the logger with a string to preserve the logging namespace after the server side transport renames.
_logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Http.Connections.Internal.Transports.ServerSentEventsTransport");
@ -51,11 +57,17 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
Log.SSEWritingMessage(_logger, buffer.Length);
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, context.Response.Body);
_connection?.StartSendCancellation();
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, context.Response.Body, _connection?.SendingToken ?? default);
}
else if (result.IsCompleted)
{
@ -64,6 +76,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
}
finally
{
_connection?.StopSendCancellation();
_application.AdvanceTo(buffer.End);
}
}

View File

@ -231,7 +231,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
if (WebSocketCanSend(socket))
{
await socket.SendAsync(buffer, webSocketMessageType);
_connection.StartSendCancellation();
await socket.SendAsync(buffer, webSocketMessageType, _connection.SendingToken);
}
else
{
@ -254,6 +255,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
}
finally
{
_connection.StopSendCancellation();
_application.Input.AdvanceTo(buffer.End);
}
}

View File

@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Http.Connections
@ -15,19 +16,19 @@ namespace Microsoft.AspNetCore.Http.Connections
private const byte LineFeed = (byte)'\n';
public static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output)
public static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output, CancellationToken token)
{
// Payload does not contain a line feed so write it directly to output
if (payload.PositionOf(LineFeed) == null)
{
if (payload.Length > 0)
{
await output.WriteAsync(DataPrefix, 0, DataPrefix.Length);
await output.WriteAsync(payload);
await output.WriteAsync(Newline, 0, Newline.Length);
await output.WriteAsync(DataPrefix, 0, DataPrefix.Length, token);
await output.WriteAsync(payload, token);
await output.WriteAsync(Newline, 0, Newline.Length, token);
}
await output.WriteAsync(Newline, 0, Newline.Length);
await output.WriteAsync(Newline, 0, Newline.Length, token);
return;
}
@ -37,7 +38,7 @@ namespace Microsoft.AspNetCore.Http.Connections
await WriteMessageToMemory(ms, payload);
ms.Position = 0;
await ms.CopyToAsync(output);
await ms.CopyToAsync(output, token);
}
/// <summary>

View File

@ -1050,6 +1050,178 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
}
}
private class BlockingStream : Stream
{
private readonly SyncPoint _sync;
private bool _isSSE;
public BlockingStream(SyncPoint sync, bool isSSE = false)
{
_sync = sync;
_isSSE = isSSE;
}
public override bool CanRead => throw new NotImplementedException();
public override bool CanSeek => throw new NotImplementedException();
public override bool CanWrite => throw new NotImplementedException();
public override long Length => throw new NotImplementedException();
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (_isSSE)
{
// SSE does an initial write of :\r\n that we want to ignore in testing
_isSSE = false;
return;
}
await _sync.WaitToContinue();
cancellationToken.ThrowIfCancellationRequested();
}
#if NETCOREAPP2_1
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_isSSE)
{
// SSE does an initial write of :\r\n that we want to ignore in testing
_isSSE = false;
return;
}
await _sync.WaitToContinue();
cancellationToken.ThrowIfCancellationRequested();
}
#endif
}
[Fact]
[LogLevel(LogLevel.Debug)]
public async Task LongPollingConnectionClosesWhenSendTimeoutReached()
{
bool ExpectedErrors(WriteContext writeContext)
{
return (writeContext.LoggerName == typeof(Internal.Transports.LongPollingServerTransport).FullName &&
writeContext.EventId.Name == "LongPollingTerminated") ||
(writeContext.LoggerName == typeof(HttpConnectionManager).FullName && writeContext.EventId.Name == "FailedDispose");
}
using (StartVerifiableLog(expectedErrorsFilter: ExpectedErrors))
{
var manager = CreateConnectionManager(LoggerFactory);
var connection = manager.CreateConnection();
connection.TransportType = HttpTransportType.LongPolling;
var dispatcher = new HttpConnectionDispatcher(manager, LoggerFactory);
var context = MakeRequest("/foo", connection);
var services = new ServiceCollection();
services.AddSingleton<TestConnectionHandler>();
var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<TestConnectionHandler>();
var app = builder.Build();
var options = new HttpConnectionDispatcherOptions();
// First poll completes immediately
await dispatcher.ExecuteAsync(context, options, app).OrTimeout();
var sync = new SyncPoint();
context.Response.Body = new BlockingStream(sync);
var dispatcherTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(new byte[] { 1 }).OrTimeout();
await sync.WaitForSyncPoint().OrTimeout();
// Cancel write to response body
connection.TryCancelSend(long.MaxValue);
sync.Continue();
await dispatcherTask.OrTimeout();
// Connection should be removed on canceled write
Assert.False(manager.TryGetConnection(connection.ConnectionId, out var _));
}
}
[Fact]
[LogLevel(LogLevel.Debug)]
public async Task SSEConnectionClosesWhenSendTimeoutReached()
{
using (StartVerifiableLog())
{
var manager = CreateConnectionManager(LoggerFactory);
var connection = manager.CreateConnection();
connection.TransportType = HttpTransportType.ServerSentEvents;
var dispatcher = new HttpConnectionDispatcher(manager, LoggerFactory);
var context = MakeRequest("/foo", connection);
SetTransport(context, connection.TransportType);
var services = new ServiceCollection();
services.AddSingleton<TestConnectionHandler>();
var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<TestConnectionHandler>();
var app = builder.Build();
var sync = new SyncPoint();
context.Response.Body = new BlockingStream(sync, isSSE: true);
var options = new HttpConnectionDispatcherOptions();
var dispatcherTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(new byte[] { 1 }).OrTimeout();
await sync.WaitForSyncPoint().OrTimeout();
// Cancel write to response body
connection.TryCancelSend(long.MaxValue);
sync.Continue();
await dispatcherTask.OrTimeout();
// Connection should be removed on canceled write
Assert.False(manager.TryGetConnection(connection.ConnectionId, out var _));
}
}
[Fact]
[LogLevel(LogLevel.Debug)]
public async Task WebSocketConnectionClosesWhenSendTimeoutReached()
{
bool ExpectedErrors(WriteContext writeContext)
{
return writeContext.LoggerName == typeof(Internal.Transports.WebSocketsServerTransport).FullName &&
writeContext.EventId.Name == "ErrorWritingFrame";
}
using (StartVerifiableLog(expectedErrorsFilter: ExpectedErrors))
{
var manager = CreateConnectionManager(LoggerFactory);
var connection = manager.CreateConnection();
connection.TransportType = HttpTransportType.WebSockets;
var dispatcher = new HttpConnectionDispatcher(manager, LoggerFactory);
var sync = new SyncPoint();
var context = MakeRequest("/foo", connection);
SetTransport(context, connection.TransportType, sync);
var services = new ServiceCollection();
services.AddSingleton<TestConnectionHandler>();
var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<TestConnectionHandler>();
var app = builder.Build();
var options = new HttpConnectionDispatcherOptions();
options.WebSockets.CloseTimeout = TimeSpan.FromSeconds(0);
var dispatcherTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(new byte[] { 1 }).OrTimeout();
await sync.WaitForSyncPoint().OrTimeout();
// Cancel write to response body
connection.TryCancelSend(long.MaxValue);
sync.Continue();
await dispatcherTask.OrTimeout();
// Connection should be removed on canceled write
Assert.False(manager.TryGetConnection(connection.ConnectionId, out var _));
}
}
[Fact]
[LogLevel(LogLevel.Trace)]
public async Task WebSocketTransportTimesOutWhenCloseFrameNotReceived()
@ -1622,6 +1794,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
Assert.Equal(StatusCodes.Status202Accepted, deleteContext.Response.StatusCode);
Assert.Equal("text/plain", deleteContext.Response.ContentType);
await connection.DisposeAndRemoveTask.OrTimeout();
// Verify the connection was removed from the manager
Assert.False(manager.TryGetConnection(connection.ConnectionToken, out _));
}
@ -1675,6 +1849,110 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
}
}
[Fact]
public async Task DeleteEndpointTerminatesLongPollingWithHangingApplication()
{
using (StartVerifiableLog())
{
var manager = CreateConnectionManager(LoggerFactory);
var pipeOptions = new PipeOptions(pauseWriterThreshold: 2, resumeWriterThreshold: 1);
var connection = manager.CreateConnection(pipeOptions, pipeOptions);
connection.TransportType = HttpTransportType.LongPolling;
var dispatcher = new HttpConnectionDispatcher(manager, LoggerFactory);
var context = MakeRequest("/foo", connection);
var services = new ServiceCollection();
services.AddSingleton<NeverEndingConnectionHandler>();
var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<NeverEndingConnectionHandler>();
var app = builder.Build();
var options = new HttpConnectionDispatcherOptions();
var pollTask = dispatcher.ExecuteAsync(context, options, app);
Assert.True(pollTask.IsCompleted);
// Now send the second poll
pollTask = dispatcher.ExecuteAsync(context, options, app);
// Issue the delete request and make sure the poll completes
var deleteContext = new DefaultHttpContext();
deleteContext.Request.Path = "/foo";
deleteContext.Request.QueryString = new QueryString($"?id={connection.ConnectionId}");
deleteContext.Request.Method = "DELETE";
Assert.False(pollTask.IsCompleted);
await dispatcher.ExecuteAsync(deleteContext, options, app).OrTimeout();
await pollTask.OrTimeout();
// Verify that transport shuts down
await connection.TransportTask.OrTimeout();
// Verify the response from the DELETE request
Assert.Equal(StatusCodes.Status202Accepted, deleteContext.Response.StatusCode);
Assert.Equal("text/plain", deleteContext.Response.ContentType);
Assert.Equal(HttpConnectionStatus.Disposed, connection.Status);
// Verify the connection not removed because application is hanging
Assert.True(manager.TryGetConnection(connection.ConnectionId, out _));
}
}
[Fact]
public async Task PollCanReceiveFinalMessageAfterAppCompletes()
{
using (StartVerifiableLog())
{
var transportType = HttpTransportType.LongPolling;
var manager = CreateConnectionManager(LoggerFactory);
var dispatcher = new HttpConnectionDispatcher(manager, LoggerFactory);
var connection = manager.CreateConnection();
connection.TransportType = transportType;
var waitForMessageTcs1 = new TaskCompletionSource<object>();
var messageTcs1 = new TaskCompletionSource<object>();
var waitForMessageTcs2 = new TaskCompletionSource<object>();
var messageTcs2 = new TaskCompletionSource<object>();
ConnectionDelegate connectionDelegate = async c =>
{
await waitForMessageTcs1.Task.OrTimeout();
await c.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Message1")).OrTimeout();
messageTcs1.TrySetResult(null);
await waitForMessageTcs2.Task.OrTimeout();
await c.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Message2")).OrTimeout();
messageTcs2.TrySetResult(null);
};
{
var options = new HttpConnectionDispatcherOptions();
var context = MakeRequest("/foo", connection);
await dispatcher.ExecuteAsync(context, options, connectionDelegate).OrTimeout();
// second poll should have data
waitForMessageTcs1.SetResult(null);
await messageTcs1.Task.OrTimeout();
var ms = new MemoryStream();
context.Response.Body = ms;
// Now send the second poll
await dispatcher.ExecuteAsync(context, options, connectionDelegate).OrTimeout();
Assert.Equal("Message1", Encoding.UTF8.GetString(ms.ToArray()));
waitForMessageTcs2.SetResult(null);
await messageTcs2.Task.OrTimeout();
context = MakeRequest("/foo", connection);
ms.Seek(0, SeekOrigin.Begin);
context.Response.Body = ms;
// This is the third poll which gets the final message after the app is complete
await dispatcher.ExecuteAsync(context, options, connectionDelegate).OrTimeout();
Assert.Equal("Message2", Encoding.UTF8.GetString(ms.ToArray()));
}
}
}
[Fact]
public async Task NegotiateDoesNotReturnWebSocketsWhenNotAvailable()
{
@ -1987,12 +2265,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
return context;
}
private static void SetTransport(HttpContext context, HttpTransportType transportType)
private static void SetTransport(HttpContext context, HttpTransportType transportType, SyncPoint sync = null)
{
switch (transportType)
{
case HttpTransportType.WebSockets:
context.Features.Set<IHttpWebSocketFeature>(new TestWebSocketConnectionFeature());
context.Features.Set<IHttpWebSocketFeature>(new TestWebSocketConnectionFeature(sync));
break;
case HttpTransportType.ServerSentEvents:
context.Request.Headers["Accept"] = "text/event-stream";

View File

@ -235,9 +235,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
try
{
Assert.True(result.IsCompleted);
// We should be able to write
await connection.Transport.Output.WriteAsync(new byte[] { 1 });
}
finally
{
@ -248,13 +245,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
connection.TransportTask = Task.Run(async () =>
{
var result = await connection.Application.Input.ReadAsync();
Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray());
connection.Application.Input.AdvanceTo(result.Buffer.End);
result = await connection.Application.Input.ReadAsync();
try
{
Assert.True(result.IsCompleted);
Assert.True(result.IsCanceled);
}
finally
{

View File

@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var buffer = new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(payload));
var output = new MemoryStream();
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, output);
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, output, default);
Assert.Equal(encoded, Encoding.UTF8.GetString(output.ToArray()));
}
@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var buffer = ReadOnlySequenceFactory.SegmentPerByteFactory.CreateWithContent(Encoding.UTF8.GetBytes(payload));
var output = new MemoryStream();
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, output);
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, output, default);
Assert.Equal(encoded, Encoding.UTF8.GetString(output.ToArray()));
}

View File

@ -5,11 +5,21 @@ using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Tests;
namespace Microsoft.AspNetCore.Http.Connections.Tests
{
internal class TestWebSocketConnectionFeature : IHttpWebSocketFeature, IDisposable
{
public TestWebSocketConnectionFeature()
{ }
public TestWebSocketConnectionFeature(SyncPoint sync)
{
_sync = sync;
}
private readonly SyncPoint _sync;
private readonly TaskCompletionSource<object> _accepted = new TaskCompletionSource<object>();
public bool IsWebSocketRequest => true;
@ -27,8 +37,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var clientToServer = Channel.CreateUnbounded<WebSocketMessage>();
var serverToClient = Channel.CreateUnbounded<WebSocketMessage>();
var clientSocket = new WebSocketChannel(serverToClient.Reader, clientToServer.Writer);
var serverSocket = new WebSocketChannel(clientToServer.Reader, serverToClient.Writer);
var clientSocket = new WebSocketChannel(serverToClient.Reader, clientToServer.Writer, _sync);
var serverSocket = new WebSocketChannel(clientToServer.Reader, serverToClient.Writer, _sync);
Client = clientSocket;
SubProtocol = context.SubProtocol;
@ -45,16 +55,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
{
private readonly ChannelReader<WebSocketMessage> _input;
private readonly ChannelWriter<WebSocketMessage> _output;
private readonly SyncPoint _sync;
private WebSocketCloseStatus? _closeStatus;
private string _closeStatusDescription;
private WebSocketState _state;
private WebSocketMessage _internalBuffer = new WebSocketMessage();
public WebSocketChannel(ChannelReader<WebSocketMessage> input, ChannelWriter<WebSocketMessage> output)
public WebSocketChannel(ChannelReader<WebSocketMessage> input, ChannelWriter<WebSocketMessage> output, SyncPoint sync = null)
{
_input = input;
_output = output;
_sync = sync;
}
public override WebSocketCloseStatus? CloseStatus => _closeStatus;
@ -173,11 +185,17 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
throw new InvalidOperationException("Unexpected close");
}
public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
public override async Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
{
if (_sync != null)
{
await _sync.WaitToContinue();
}
cancellationToken.ThrowIfCancellationRequested();
var copy = new byte[buffer.Count];
Buffer.BlockCopy(buffer.Array, buffer.Offset, copy, 0, buffer.Count);
return SendMessageAsync(new WebSocketMessage
await SendMessageAsync(new WebSocketMessage
{
Buffer = copy,
MessageType = messageType,

View File

@ -77,7 +77,16 @@ namespace System.IO.Pipelines
_length += source.Length;
var task = _pipeWriter.WriteAsync(source);
if (!task.IsCompletedSuccessfully)
if (task.IsCompletedSuccessfully)
{
// Cancellation can be triggered by PipeWriter.CancelPendingFlush
if (task.Result.IsCanceled)
{
throw new OperationCanceledException();
}
}
else
{
return WriteSlowAsync(task);
}

View File

@ -37,9 +37,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public TransferFormat ActiveFormat { get; set; }
public TestClient(IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, string userIdentifier = null)
public TestClient(IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, string userIdentifier = null, long pauseWriterThreshold = 32768)
{
var options = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
var options = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false,
pauseWriterThreshold: pauseWriterThreshold, resumeWriterThreshold: pauseWriterThreshold / 2);
var pair = DuplexPipe.CreateConnectionPair(options, options);
Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Transport, pair.Application);
@ -70,16 +71,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
if (sendHandshakeRequestMessage)
{
var memoryBufferWriter = MemoryBufferWriter.Get();
try
{
HandshakeProtocol.WriteRequestMessage(new HandshakeRequestMessage(_protocol.Name, _protocol.Version), memoryBufferWriter);
await Connection.Application.Output.WriteAsync(memoryBufferWriter.ToArray());
}
finally
{
MemoryBufferWriter.Return(memoryBufferWriter);
}
await Connection.Application.Output.WriteAsync(GetHandshakeRequestMessage());
}
var connection = handler.OnConnectedAsync(Connection);
@ -257,7 +249,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
else
{
// read first message out of the incoming data
// read first message out of the incoming data
if (HandshakeProtocol.TryParseResponseMessage(ref buffer, out var responseMessage))
{
return responseMessage;
@ -312,6 +304,20 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public byte[] GetHandshakeRequestMessage()
{
var memoryBufferWriter = MemoryBufferWriter.Get();
try
{
HandshakeProtocol.WriteRequestMessage(new HandshakeRequestMessage(_protocol.Name, _protocol.Version), memoryBufferWriter);
return memoryBufferWriter.ToArray();
}
finally
{
MemoryBufferWriter.Return(memoryBufferWriter);
}
}
private class DefaultInvocationBinder : IInvocationBinder
{
public IReadOnlyList<Type> GetParameterTypes(string methodName)

View File

@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_parser = new ServerSentEventsMessageParser();
_rawData = new ReadOnlySequence<byte>(protocol.GetMessageBytes(hubMessage));
var ms = new MemoryStream();
ServerSentEventsMessageFormatter.WriteMessageAsync(_rawData, ms).GetAwaiter().GetResult();
ServerSentEventsMessageFormatter.WriteMessageAsync(_rawData, ms, default).GetAwaiter().GetResult();
_sseFormattedData = ms.ToArray();
}
@ -81,7 +81,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark]
public Task WriteSingleMessage()
{
return ServerSentEventsMessageFormatter.WriteMessageAsync(_rawData, Stream.Null);
return ServerSentEventsMessageFormatter.WriteMessageAsync(_rawData, Stream.Null, default);
}
public enum Message

View File

@ -82,10 +82,10 @@ namespace Microsoft.AspNetCore.SignalR
/// <inheritdoc />
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, null);
return SendToAllConnections(methodName, args, include: null, cancellationToken);
}
private Task SendToAllConnections(string methodName, object[] args, Func<HubConnectionContext, bool> include)
private Task SendToAllConnections(string methodName, object[] args, Func<HubConnectionContext, bool> include, CancellationToken cancellationToken)
{
List<Task> tasks = null;
SerializedHubMessage message = null;
@ -103,7 +103,7 @@ namespace Microsoft.AspNetCore.SignalR
message = CreateSerializedInvocationMessage(methodName, args);
}
var task = connection.WriteAsync(message);
var task = connection.WriteAsync(message, cancellationToken);
if (!task.IsCompletedSuccessfully)
{
@ -127,7 +127,8 @@ namespace Microsoft.AspNetCore.SignalR
// Tasks and message are passed by ref so they can be lazily created inside the method post-filtering,
// while still being re-usable when sending to multiple groups
private void SendToGroupConnections(string methodName, object[] args, ConcurrentDictionary<string, HubConnectionContext> connections, Func<HubConnectionContext, bool> include, ref List<Task> tasks, ref SerializedHubMessage message)
private void SendToGroupConnections(string methodName, object[] args, ConcurrentDictionary<string, HubConnectionContext> connections, Func<HubConnectionContext, bool> include,
ref List<Task> tasks, ref SerializedHubMessage message, CancellationToken cancellationToken)
{
// foreach over ConcurrentDictionary avoids allocating an enumerator
foreach (var connection in connections)
@ -142,7 +143,7 @@ namespace Microsoft.AspNetCore.SignalR
message = CreateSerializedInvocationMessage(methodName, args);
}
var task = connection.Value.WriteAsync(message);
var task = connection.Value.WriteAsync(message, cancellationToken);
if (!task.IsCompletedSuccessfully)
{
@ -175,7 +176,7 @@ namespace Microsoft.AspNetCore.SignalR
// Write message directly to connection without caching it in memory
var message = CreateInvocationMessage(methodName, args);
return connection.WriteAsync(message).AsTask();
return connection.WriteAsync(message, cancellationToken).AsTask();
}
/// <inheritdoc />
@ -193,7 +194,7 @@ namespace Microsoft.AspNetCore.SignalR
// group might be modified inbetween checking and sending
List<Task> tasks = null;
SerializedHubMessage message = null;
SendToGroupConnections(methodName, args, group, null, ref tasks, ref message);
SendToGroupConnections(methodName, args, group, null, ref tasks, ref message, cancellationToken);
if (tasks != null)
{
@ -221,7 +222,7 @@ namespace Microsoft.AspNetCore.SignalR
var group = _groups[groupName];
if (group != null)
{
SendToGroupConnections(methodName, args, group, null, ref tasks, ref message);
SendToGroupConnections(methodName, args, group, null, ref tasks, ref message, cancellationToken);
}
}
@ -247,7 +248,7 @@ namespace Microsoft.AspNetCore.SignalR
List<Task> tasks = null;
SerializedHubMessage message = null;
SendToGroupConnections(methodName, args, group, connection => !excludedConnectionIds.Contains(connection.ConnectionId), ref tasks, ref message);
SendToGroupConnections(methodName, args, group, connection => !excludedConnectionIds.Contains(connection.ConnectionId), ref tasks, ref message, cancellationToken);
if (tasks != null)
{
@ -271,7 +272,7 @@ namespace Microsoft.AspNetCore.SignalR
/// <inheritdoc />
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => string.Equals(connection.UserIdentifier, userId, StringComparison.Ordinal));
return SendToAllConnections(methodName, args, connection => string.Equals(connection.UserIdentifier, userId, StringComparison.Ordinal), cancellationToken);
}
/// <inheritdoc />
@ -292,19 +293,19 @@ namespace Microsoft.AspNetCore.SignalR
/// <inheritdoc />
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => !excludedConnectionIds.Contains(connection.ConnectionId));
return SendToAllConnections(methodName, args, connection => !excludedConnectionIds.Contains(connection.ConnectionId), cancellationToken);
}
/// <inheritdoc />
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => connectionIds.Contains(connection.ConnectionId));
return SendToAllConnections(methodName, args, connection => connectionIds.Contains(connection.ConnectionId), cancellationToken);
}
/// <inheritdoc />
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => userIds.Contains(connection.UserIdentifier));
return SendToAllConnections(methodName, args, connection => userIds.Contains(connection.UserIdentifier), cancellationToken);
}
}
}

View File

@ -34,6 +34,8 @@ namespace Microsoft.AspNetCore.SignalR
private readonly long _keepAliveInterval;
private readonly long _clientTimeoutInterval;
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1);
private readonly bool _useAbsoluteClientTimeout;
private readonly object _receiveMessageTimeoutLock = new object();
private StreamTracker _streamTracker;
private long _lastSendTimeStamp = DateTime.UtcNow.Ticks;
@ -41,10 +43,13 @@ namespace Microsoft.AspNetCore.SignalR
private bool _receivedMessageThisInterval = false;
private ReadOnlyMemory<byte> _cachedPingMessage;
private bool _clientTimeoutActive;
private bool _connectionAborted;
private volatile bool _connectionAborted;
private volatile bool _allowReconnect = true;
private int _streamBufferCapacity;
private long? _maxMessageSize;
private bool _receivedMessageTimeoutEnabled = false;
private long _receivedMessageElapsedTicks = 0;
private long _receivedMessageTimestamp;
/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
@ -64,6 +69,11 @@ namespace Microsoft.AspNetCore.SignalR
ConnectionAborted = _connectionAbortedTokenSource.Token;
HubCallerContext = new DefaultHubCallerContext(this);
if (AppContext.TryGetSwitch("Microsoft.AspNetCore.SignalR.UseAbsoluteClientTimeout", out var useAbsoluteClientTimeout))
{
_useAbsoluteClientTimeout = useAbsoluteClientTimeout;
}
}
internal StreamTracker StreamTracker
@ -131,7 +141,7 @@ namespace Microsoft.AspNetCore.SignalR
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))
{
return new ValueTask(WriteSlowAsync(message));
return new ValueTask(WriteSlowAsync(message, cancellationToken));
}
if (_connectionAborted)
@ -141,7 +151,7 @@ namespace Microsoft.AspNetCore.SignalR
}
// This method should never throw synchronously
var task = WriteCore(message);
var task = WriteCore(message, cancellationToken);
// The write didn't complete synchronously so await completion
if (!task.IsCompletedSuccessfully)
@ -167,7 +177,7 @@ namespace Microsoft.AspNetCore.SignalR
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))
{
return new ValueTask(WriteSlowAsync(message));
return new ValueTask(WriteSlowAsync(message, cancellationToken));
}
if (_connectionAborted)
@ -177,7 +187,7 @@ namespace Microsoft.AspNetCore.SignalR
}
// This method should never throw synchronously
var task = WriteCore(message);
var task = WriteCore(message, cancellationToken);
// The write didn't complete synchronously so await completion
if (!task.IsCompletedSuccessfully)
@ -191,7 +201,7 @@ namespace Microsoft.AspNetCore.SignalR
return default;
}
private ValueTask<FlushResult> WriteCore(HubMessage message)
private ValueTask<FlushResult> WriteCore(HubMessage message, CancellationToken cancellationToken)
{
try
{
@ -199,7 +209,7 @@ namespace Microsoft.AspNetCore.SignalR
// write it without caching.
Protocol.WriteMessage(message, _connectionContext.Transport.Output);
return _connectionContext.Transport.Output.FlushAsync();
return _connectionContext.Transport.Output.FlushAsync(cancellationToken);
}
catch (Exception ex)
{
@ -211,14 +221,14 @@ namespace Microsoft.AspNetCore.SignalR
}
}
private ValueTask<FlushResult> WriteCore(SerializedHubMessage message)
private ValueTask<FlushResult> WriteCore(SerializedHubMessage message, CancellationToken cancellationToken)
{
try
{
// Grab a preserialized buffer for this protocol.
var buffer = message.GetSerializedMessage(Protocol);
return _connectionContext.Transport.Output.WriteAsync(buffer);
return _connectionContext.Transport.Output.WriteAsync(buffer, cancellationToken);
}
catch (Exception ex)
{
@ -249,10 +259,10 @@ namespace Microsoft.AspNetCore.SignalR
}
}
private async Task WriteSlowAsync(HubMessage message)
private async Task WriteSlowAsync(HubMessage message, CancellationToken cancellationToken)
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
await _writeLock.WaitAsync(cancellationToken);
try
{
@ -261,7 +271,7 @@ namespace Microsoft.AspNetCore.SignalR
return;
}
await WriteCore(message);
await WriteCore(message, cancellationToken);
}
catch (Exception ex)
{
@ -274,7 +284,7 @@ namespace Microsoft.AspNetCore.SignalR
}
}
private async Task WriteSlowAsync(SerializedHubMessage message)
private async Task WriteSlowAsync(SerializedHubMessage message, CancellationToken cancellationToken)
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
@ -286,7 +296,7 @@ namespace Microsoft.AspNetCore.SignalR
return;
}
await WriteCore(message);
await WriteCore(message, cancellationToken);
}
catch (Exception ex)
{
@ -370,6 +380,9 @@ namespace Microsoft.AspNetCore.SignalR
private void AbortAllowReconnect()
{
_connectionAborted = true;
// Cancel any current writes or writes that are about to happen and have already gone past the _connectionAborted bool
// We have to do this outside of the lock otherwise it could hang if the write is observing backpressure
_connectionContext.Transport.Output.CancelPendingFlush();
// If we already triggered the token then noop, this isn't thread safe but it's good enough
// to avoid spawning a new task in the most common cases
@ -525,9 +538,23 @@ namespace Microsoft.AspNetCore.SignalR
internal Task AbortAsync()
{
AbortAllowReconnect();
// Acquire lock to make sure all writes are completed
if (!_writeLock.Wait(0))
{
return AbortAsyncSlow();
}
_writeLock.Release();
return _abortCompletedTcs.Task;
}
private async Task AbortAsyncSlow()
{
await _writeLock.WaitAsync();
_writeLock.Release();
await _abortCompletedTcs.Task;
}
private void KeepAliveTick()
{
var currentTime = DateTime.UtcNow.Ticks;
@ -564,17 +591,41 @@ namespace Microsoft.AspNetCore.SignalR
private void CheckClientTimeout()
{
// If it's been too long since we've heard from the client, then close this
if (DateTime.UtcNow.Ticks - Volatile.Read(ref _lastReceivedTimeStamp) > _clientTimeoutInterval)
if (Debugger.IsAttached)
{
if (!_receivedMessageThisInterval)
{
Log.ClientTimeout(_logger, TimeSpan.FromTicks(_clientTimeoutInterval));
AbortAllowReconnect();
}
return;
}
_receivedMessageThisInterval = false;
Volatile.Write(ref _lastReceivedTimeStamp, DateTime.UtcNow.Ticks);
if (_useAbsoluteClientTimeout)
{
// If it's been too long since we've heard from the client, then close this
if (DateTime.UtcNow.Ticks - Volatile.Read(ref _lastReceivedTimeStamp) > _clientTimeoutInterval)
{
if (!_receivedMessageThisInterval)
{
Log.ClientTimeout(_logger, TimeSpan.FromTicks(_clientTimeoutInterval));
AbortAllowReconnect();
}
_receivedMessageThisInterval = false;
Volatile.Write(ref _lastReceivedTimeStamp, DateTime.UtcNow.Ticks);
}
}
else
{
lock (_receiveMessageTimeoutLock)
{
if (_receivedMessageTimeoutEnabled)
{
_receivedMessageElapsedTicks = DateTime.UtcNow.Ticks - _receivedMessageTimestamp;
if (_receivedMessageElapsedTicks >= _clientTimeoutInterval)
{
Log.ClientTimeout(_logger, TimeSpan.FromTicks(_clientTimeoutInterval));
AbortAllowReconnect();
}
}
}
}
}
@ -623,6 +674,35 @@ namespace Microsoft.AspNetCore.SignalR
_receivedMessageThisInterval = true;
}
internal void BeginClientTimeout()
{
// check if new timeout behavior is in use
if (!_useAbsoluteClientTimeout)
{
lock (_receiveMessageTimeoutLock)
{
_receivedMessageTimeoutEnabled = true;
_receivedMessageTimestamp = DateTime.UtcNow.Ticks;
}
}
}
internal void StopClientTimeout()
{
// check if new timeout behavior is in use
if (!_useAbsoluteClientTimeout)
{
lock (_receiveMessageTimeoutLock)
{
// we received a message so stop the timer and reset it
// it will resume after the message has been processed
_receivedMessageElapsedTicks = 0;
_receivedMessageTimestamp = 0;
_receivedMessageTimeoutEnabled = false;
}
}
}
private static class Log
{
// Category: HubConnectionContext

View File

@ -213,6 +213,8 @@ namespace Microsoft.AspNetCore.SignalR
{
var input = connection.Input;
var protocol = connection.Protocol;
connection.BeginClientTimeout();
var binder = new HubConnectionBinder<THub>(_dispatcher, connection);
@ -221,6 +223,8 @@ namespace Microsoft.AspNetCore.SignalR
var result = await input.ReadAsync();
var buffer = result.Buffer;
connection.ResetClientTimeout();
try
{
if (result.IsCanceled)
@ -230,15 +234,21 @@ namespace Microsoft.AspNetCore.SignalR
if (!buffer.IsEmpty)
{
connection.ResetClientTimeout();
bool messageReceived = false;
// No message limit, just parse and dispatch
if (_maximumMessageSize == null)
{
while (protocol.TryParseMessage(ref buffer, binder, out var message))
{
messageReceived = true;
connection.StopClientTimeout();
await _dispatcher.DispatchMessageAsync(connection, message);
}
if (messageReceived)
{
connection.BeginClientTimeout();
}
}
else
{
@ -258,6 +268,9 @@ namespace Microsoft.AspNetCore.SignalR
if (protocol.TryParseMessage(ref segment, binder, out var message))
{
messageReceived = true;
connection.StopClientTimeout();
await _dispatcher.DispatchMessageAsync(connection, message);
}
else if (overLength)
@ -273,6 +286,11 @@ namespace Microsoft.AspNetCore.SignalR
// Update the buffer to the remaining segment
buffer = buffer.Slice(segment.Start);
}
if (messageReceived)
{
connection.BeginClientTimeout();
}
}
}

View File

@ -105,7 +105,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendAllAsync(method, args);
return _lifetimeManager.SendAllAsync(method, args, cancellationToken);
}
}

View File

@ -1,9 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Specification.Tests;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.AspNetCore.SignalR.Specification.Tests;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Tests
{
@ -13,5 +18,241 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
return new DefaultHubLifetimeManager<MyHub>(new Logger<DefaultHubLifetimeManager<MyHub>>(NullLoggerFactory.Instance));
}
[Fact]
public async Task SendAllAsyncWillCancelWithToken()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendAllAsync("Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var message = Assert.IsType<InvocationMessage>(client1.TryRead());
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection2.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
}
}
[Fact]
public async Task SendAllExceptAsyncWillCancelWithToken()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendAllExceptAsync("Hello", new object[] { "World" }, new List<string> { connection1.ConnectionId }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection2.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
Assert.Null(client1.TryRead());
}
}
[Fact]
public async Task SendConnectionAsyncWillCancelWithToken()
{
using (var client1 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendConnectionAsync(connection1.ConnectionId, "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection1.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
}
}
[Fact]
public async Task SendConnectionsAsyncWillCancelWithToken()
{
using (var client1 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendConnectionsAsync(new List<string> { connection1.ConnectionId }, "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection1.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
}
}
[Fact]
public async Task SendGroupAsyncWillCancelWithToken()
{
using (var client1 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.AddToGroupAsync(connection1.ConnectionId, "group").OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendGroupAsync("group", "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection1.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
}
}
[Fact]
public async Task SendGroupExceptAsyncWillCancelWithToken()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
await manager.AddToGroupAsync(connection1.ConnectionId, "group").OrTimeout();
await manager.AddToGroupAsync(connection2.ConnectionId, "group").OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendGroupExceptAsync("group", "Hello", new object[] { "World" }, new List<string> { connection1.ConnectionId }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection2.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
Assert.Null(client1.TryRead());
}
}
[Fact]
public async Task SendGroupsAsyncWillCancelWithToken()
{
using (var client1 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.AddToGroupAsync(connection1.ConnectionId, "group").OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendGroupsAsync(new List<string> { "group" }, "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection1.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
}
}
[Fact]
public async Task SendUserAsyncWillCancelWithToken()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection, userIdentifier: "user");
var connection2 = HubConnectionContextUtils.Create(client2.Connection, userIdentifier: "user");
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendUserAsync("user", "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var message = Assert.IsType<InvocationMessage>(client1.TryRead());
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection2.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
}
}
[Fact]
public async Task SendUsersAsyncWillCancelWithToken()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient(pauseWriterThreshold: 2))
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection, userIdentifier: "user1");
var connection2 = HubConnectionContextUtils.Create(client2.Connection, userIdentifier: "user2");
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
var cts = new CancellationTokenSource();
var sendTask = manager.SendUsersAsync(new List<string> { "user1", "user2" }, "Hello", new object[] { "World" }, cts.Token).OrTimeout();
Assert.False(sendTask.IsCompleted);
cts.Cancel();
await sendTask.OrTimeout();
var message = Assert.IsType<InvocationMessage>(client1.TryRead());
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
connection2.ConnectionAborted.Register(t =>
{
((TaskCompletionSource<object>)t).SetResult(null);
}, tcs);
await tcs.Task.OrTimeout();
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
}
}
}
}

View File

@ -2798,6 +2798,47 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout()
{
using (StartVerifiableLog())
{
var tcsService = new TcsService();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services =>
{
services.Configure<HubOptions>(options =>
options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0));
services.AddSingleton(tcsService);
}, LoggerFactory);
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<LongRunningHub>>();
using (var client = new TestClient(new JsonHubProtocol()))
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
// This starts the timeout logic
await client.SendHubMessageAsync(PingMessage.Instance);
// Call long running hub method
var hubMethodTask = client.InvokeAsync(nameof(LongRunningHub.LongRunningMethod));
await tcsService.StartedMethod.Task.OrTimeout();
// Tick heartbeat while hub method is running to show that close isn't triggered
client.TickHeartbeat();
// Unblock long running hub method
tcsService.EndMethod.SetResult(null);
await hubMethodTask.OrTimeout();
// Tick heartbeat again now that we're outside of the hub method
client.TickHeartbeat();
// Connection is closed
await connectionHandlerTask.OrTimeout();
}
}
}
[Fact]
public async Task EndingConnectionSendsCloseMessageWithNoError()
{