Small HubConnection cleanup (#8643)

This commit is contained in:
David Fowler 2019-03-21 13:30:56 -10:00 committed by Mikael Mengistu
parent 0c4b3fbd6d
commit 0d6e0637af
1 changed files with 18 additions and 6 deletions

View File

@ -59,6 +59,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
private readonly ConnectionLogScope _logScope;
// The receive loop has a single reader and single writer at a time so optimize the channel for that
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
};
// Transient state to a connection
private ConnectionState _connectionState;
private int _serverProtocolMinorVersion;
@ -823,6 +830,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
throw new InvalidOperationException("The server disconnected before the handshake was completed");
}
var input = startingConnectionState.Connection.Transport.Input;
try
{
using (var handshakeCts = new CancellationTokenSource(HandshakeTimeout))
@ -830,7 +839,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
while (true)
{
var result = await startingConnectionState.Connection.Transport.Input.ReadAsync(cts.Token);
var result = await input.ReadAsync(cts.Token);
var buffer = result.Buffer;
var consumed = buffer.Start;
@ -871,7 +880,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
finally
{
startingConnectionState.Connection.Transport.Input.AdvanceTo(consumed, examined);
input.AdvanceTo(consumed, examined);
}
}
}
@ -899,11 +908,11 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Performs periodic tasks -- here sending pings and checking timeout
// Disposed with `timer.Stop()` in the finally block below
var timer = new TimerAwaitable(TickRate, TickRate);
_ = TimerLoop(timer);
var timerTask = TimerLoop(timer);
var uploadStreamSource = new CancellationTokenSource();
_uploadStreamToken = uploadStreamSource.Token;
var invocationMessageChannel = Channel.CreateUnbounded<InvocationMessage>();
var invocationMessageChannel = Channel.CreateUnbounded<InvocationMessage>(_receiveLoopOptions);
var invocationMessageReceiveTask = StartProcessingInvocationMessages(invocationMessageChannel.Reader);
async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> invocationMessageChannelReader)
@ -917,11 +926,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
var input = connectionState.Connection.Transport.Input;
try
{
while (true)
{
var result = await connectionState.Connection.Transport.Input.ReadAsync();
var result = await input.ReadAsync();
var buffer = result.Buffer;
try
@ -972,7 +983,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as `buffer.End` so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
@ -986,6 +997,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
invocationMessageChannel.Writer.TryComplete();
await invocationMessageReceiveTask;
timer.Stop();
await timerTask;
uploadStreamSource.Cancel();
}