From 28240ce46066335ecddb166b536162764bcefd82 Mon Sep 17 00:00:00 2001 From: Brennan Date: Thu, 16 Jan 2020 14:24:34 -0800 Subject: [PATCH] Fix flaky HubConnectionHandler test (#18391) --- .../SignalR/test/HubConnectionHandlerTests.cs | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 064aeb8ebc..c3911252a5 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -6,9 +6,11 @@ using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.IO.Pipelines; using System.Linq; using System.Security.Claims; using System.Text; +using System.Threading; using System.Threading.Tasks; using MessagePack; using MessagePack.Formatters; @@ -2797,6 +2799,78 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + internal class PipeReaderWrapper : PipeReader + { + private readonly PipeReader _originalPipeReader; + private TaskCompletionSource _waitForRead; + private object _lock = new object(); + + public PipeReaderWrapper(PipeReader pipeReader) + { + _originalPipeReader = pipeReader; + _waitForRead = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public override void AdvanceTo(SequencePosition consumed) => + _originalPipeReader.AdvanceTo(consumed); + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => + _originalPipeReader.AdvanceTo(consumed, examined); + + public override void CancelPendingRead() => + _originalPipeReader.CancelPendingRead(); + + public override void Complete(Exception exception = null) => + _originalPipeReader.Complete(exception); + + public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + lock (_lock) + { + _waitForRead.SetResult(null); + } + + try + { + return await _originalPipeReader.ReadAsync(cancellationToken); + } + finally + { + lock (_lock) + { + _waitForRead = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + } + + public override bool TryRead(out ReadResult result) => + _originalPipeReader.TryRead(out result); + + public Task WaitForReadStart() + { + lock (_lock) + { + return _waitForRead.Task; + } + } + } + + internal class CustomDuplex : IDuplexPipe + { + private readonly IDuplexPipe _originalDuplexPipe; + public readonly PipeReaderWrapper WrappedPipeReader; + + public CustomDuplex(IDuplexPipe duplexPipe) + { + _originalDuplexPipe = duplexPipe; + WrappedPipeReader = new PipeReaderWrapper(_originalDuplexPipe.Input); + } + + public PipeReader Input => WrappedPipeReader; + + public PipeWriter Output => _originalDuplexPipe.Output; + } + [Fact] public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() { @@ -2813,6 +2887,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests using (var client = new TestClient(new JsonHubProtocol())) { + var customDuplex = new CustomDuplex(client.Connection.Transport); + client.Connection.Transport = customDuplex; + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); // This starts the timeout logic await client.SendHubMessageAsync(PingMessage.Instance); @@ -2829,6 +2906,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests await hubMethodTask.OrTimeout(); + // There is a small window when the hub method finishes and the timer starts again + // So we need to delay a little before ticking the heart beat. + // We do this by waiting until we know the HubConnectionHandler code is in pipe.ReadAsync() + await customDuplex.WrappedPipeReader.WaitForReadStart().OrTimeout(); + // Tick heartbeat again now that we're outside of the hub method client.TickHeartbeat();