diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs index cde0dea2a9..e1f20238d0 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs @@ -59,6 +59,13 @@ namespace Microsoft.AspNetCore.SignalR.Client private readonly ConnectionLogScope _logScope; + // The receive loop has a single reader and single writer at a time so optimize the channel for that + private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true + }; + // Transient state to a connection private ConnectionState _connectionState; private int _serverProtocolMinorVersion; @@ -823,6 +830,8 @@ namespace Microsoft.AspNetCore.SignalR.Client throw new InvalidOperationException("The server disconnected before the handshake was completed"); } + var input = startingConnectionState.Connection.Transport.Input; + try { using (var handshakeCts = new CancellationTokenSource(HandshakeTimeout)) @@ -830,7 +839,7 @@ namespace Microsoft.AspNetCore.SignalR.Client { while (true) { - var result = await startingConnectionState.Connection.Transport.Input.ReadAsync(cts.Token); + var result = await input.ReadAsync(cts.Token); var buffer = result.Buffer; var consumed = buffer.Start; @@ -871,7 +880,7 @@ namespace Microsoft.AspNetCore.SignalR.Client } finally { - startingConnectionState.Connection.Transport.Input.AdvanceTo(consumed, examined); + input.AdvanceTo(consumed, examined); } } } @@ -899,11 +908,11 @@ namespace Microsoft.AspNetCore.SignalR.Client // Performs periodic tasks -- here sending pings and checking timeout // Disposed with `timer.Stop()` in the finally block below var timer = new TimerAwaitable(TickRate, TickRate); - _ = TimerLoop(timer); + var timerTask = TimerLoop(timer); var uploadStreamSource = new CancellationTokenSource(); _uploadStreamToken = uploadStreamSource.Token; - var invocationMessageChannel = Channel.CreateUnbounded(); + var invocationMessageChannel = Channel.CreateUnbounded(_receiveLoopOptions); var invocationMessageReceiveTask = StartProcessingInvocationMessages(invocationMessageChannel.Reader); async Task StartProcessingInvocationMessages(ChannelReader invocationMessageChannelReader) @@ -917,11 +926,13 @@ namespace Microsoft.AspNetCore.SignalR.Client } } + var input = connectionState.Connection.Transport.Input; + try { while (true) { - var result = await connectionState.Connection.Transport.Input.ReadAsync(); + var result = await input.ReadAsync(); var buffer = result.Buffer; try @@ -972,7 +983,7 @@ namespace Microsoft.AspNetCore.SignalR.Client // The buffer was sliced up to where it was consumed, so we can just advance to the start. // We mark examined as `buffer.End` so that if we didn't receive a full frame, we'll wait for more data // before yielding the read again. - connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End); + input.AdvanceTo(buffer.Start, buffer.End); } } } @@ -986,6 +997,7 @@ namespace Microsoft.AspNetCore.SignalR.Client invocationMessageChannel.Writer.TryComplete(); await invocationMessageReceiveTask; timer.Stop(); + await timerTask; uploadStreamSource.Cancel(); }