[Blazor][Fixes #8003] Improves reconnects when the client doesn't perform graceful disconnects (#12327)

* Allows the server to accept acks with a higher sequence number than the first queued pending render and caches up to it.
* Makes the client send acks for previous render batches.
* Makes the client repeat acks for errored render batches if it keeps receiving new render batches.
* Client awaits sending acks to ensure that they get send in order.
This commit is contained in:
Javier Calvarro Nelson 2019-07-20 02:16:32 +02:00 committed by GitHub
parent 3b6216c113
commit 470bfddf92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 214 additions and 40 deletions

View File

@ -52,7 +52,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
_logger = logger; _logger = logger;
} }
internal ConcurrentQueue<PendingRender> PendingRenderBatches = new ConcurrentQueue<PendingRender>(); internal ConcurrentQueue<UnacknowledgedRenderBatch> UnacknowledgedRenderBatches = new ConcurrentQueue<UnacknowledgedRenderBatch>();
public override Dispatcher Dispatcher { get; } = Dispatcher.CreateDefault(); public override Dispatcher Dispatcher { get; } = Dispatcher.CreateDefault();
@ -102,7 +102,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
{ {
_disposing = true; _disposing = true;
_rendererRegistry.TryRemove(Id); _rendererRegistry.TryRemove(Id);
while (PendingRenderBatches.TryDequeue(out var entry)) while (UnacknowledgedRenderBatches.TryDequeue(out var entry))
{ {
entry.CompletionSource.TrySetCanceled(); entry.CompletionSource.TrySetCanceled();
entry.Data.Dispose(); entry.Data.Dispose();
@ -125,7 +125,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
// snapshot its contents now. // snapshot its contents now.
var arrayBuilder = new ArrayBuilder<byte>(2048); var arrayBuilder = new ArrayBuilder<byte>(2048);
using var memoryStream = new ArrayBuilderMemoryStream(arrayBuilder); using var memoryStream = new ArrayBuilderMemoryStream(arrayBuilder);
PendingRender pendingRender; UnacknowledgedRenderBatch pendingRender;
try try
{ {
using (var renderBatchWriter = new RenderBatchWriter(memoryStream, false)) using (var renderBatchWriter = new RenderBatchWriter(memoryStream, false))
@ -135,7 +135,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
var renderId = Interlocked.Increment(ref _nextRenderId); var renderId = Interlocked.Increment(ref _nextRenderId);
pendingRender = new PendingRender( pendingRender = new UnacknowledgedRenderBatch(
renderId, renderId,
arrayBuilder, arrayBuilder,
new TaskCompletionSource<object>()); new TaskCompletionSource<object>());
@ -143,7 +143,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
// Buffer the rendered batches no matter what. We'll send it down immediately when the client // Buffer the rendered batches no matter what. We'll send it down immediately when the client
// is connected or right after the client reconnects. // is connected or right after the client reconnects.
PendingRenderBatches.Enqueue(pendingRender); UnacknowledgedRenderBatches.Enqueue(pendingRender);
} }
catch catch
{ {
@ -164,10 +164,10 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
// All the batches are sent in order based on the fact that SignalR // All the batches are sent in order based on the fact that SignalR
// provides ordering for the underlying messages and that the batches // provides ordering for the underlying messages and that the batches
// are always in order. // are always in order.
return Task.WhenAll(PendingRenderBatches.Select(b => WriteBatchBytesAsync(b))); return Task.WhenAll(UnacknowledgedRenderBatches.Select(b => WriteBatchBytesAsync(b)));
} }
private async Task WriteBatchBytesAsync(PendingRender pending) private async Task WriteBatchBytesAsync(UnacknowledgedRenderBatch pending)
{ {
// Send the render batch to the client // Send the render batch to the client
// If the "send" operation fails (synchronously or asynchronously) or the client // If the "send" operation fails (synchronously or asynchronously) or the client
@ -208,41 +208,67 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
return; return;
} }
// When clients send acks we know for sure they received and applied the batch.
// We send batches right away, and hold them in memory until we receive an ACK.
// If one or more client ACKs get lost (e.g., with long polling, client->server delivery is not guaranteed)
// we might receive an ack for a higher batch.
// We confirm all previous batches at that point (because receiving an ack is guarantee
// from the client that it has received and successfully applied all batches up to that point).
// If receive an ack for a previously acknowledged batch, its an error, as the messages are
// guranteed to be delivered in order, so a message for a render batch of 2 will never arrive
// after a message for a render batch for 3.
// If that were to be the case, it would just be enough to relax the checks here and simply skip
// the message.
// A batch might get lost when we send it to the client, because the client might disconnect before receiving and processing it.
// In this case, once it reconnects the server will re-send any unacknowledged batches, some of which the
// client might have received and even believe it did send back an acknowledgement for. The client handles
// those by re-acknowledging.
// Even though we're not on the renderer sync context here, it's safe to assume ordered execution of the following // Even though we're not on the renderer sync context here, it's safe to assume ordered execution of the following
// line (i.e., matching the order in which we received batch completion messages) based on the fact that SignalR // line (i.e., matching the order in which we received batch completion messages) based on the fact that SignalR
// synchronizes calls to hub methods. That is, it won't issue more than one call to this method from the same hub // synchronizes calls to hub methods. That is, it won't issue more than one call to this method from the same hub
// at the same time on different threads. // at the same time on different threads.
if (!PendingRenderBatches.TryDequeue(out var entry))
if (!UnacknowledgedRenderBatches.TryPeek(out var nextUnacknowledgedBatch) || incomingBatchId < nextUnacknowledgedBatch.BatchId)
{ {
HandleException( Log.ReceivedDuplicateBatchAck(_logger, incomingBatchId);
new InvalidOperationException($"Received a notification for a rendered batch when not expecting it. Batch id '{incomingBatchId}'."));
} }
else else
{ {
entry.Data.Dispose(); var lastBatchId = nextUnacknowledgedBatch.BatchId;
// Order is important here so that we don't prematurely dequeue the last nextUnacknowledgedBatch
while (UnacknowledgedRenderBatches.TryPeek(out nextUnacknowledgedBatch) && nextUnacknowledgedBatch.BatchId <= incomingBatchId)
{
lastBatchId = nextUnacknowledgedBatch.BatchId;
UnacknowledgedRenderBatches.TryDequeue(out _);
ProcessPendingBatch(errorMessageOrNull, nextUnacknowledgedBatch);
}
if (entry.BatchId != incomingBatchId) if (lastBatchId < incomingBatchId)
{ {
HandleException( HandleException(
new InvalidOperationException($"Received a notification for a rendered batch when not expecting it. Batch id '{incomingBatchId}'.")); new InvalidOperationException($"Received an acknowledgement for batch with id '{incomingBatchId}' when the last batch produced was '{lastBatchId}'."));
} }
else
{
if (errorMessageOrNull == null)
{
Log.CompletingBatchWithoutError(_logger, entry.BatchId);
}
else
{
Log.CompletingBatchWithError(_logger, entry.BatchId, errorMessageOrNull);
}
}
CompleteRender(entry.CompletionSource, errorMessageOrNull);
} }
} }
private void ProcessPendingBatch(string errorMessageOrNull, UnacknowledgedRenderBatch entry)
{
if (errorMessageOrNull == null)
{
Log.CompletingBatchWithoutError(_logger, entry.BatchId);
}
else
{
Log.CompletingBatchWithError(_logger, entry.BatchId, errorMessageOrNull);
}
entry.Data.Dispose();
CompleteRender(entry.CompletionSource, errorMessageOrNull);
}
private void CompleteRender(TaskCompletionSource<object> pendingRenderInfo, string errorMessageOrNull) private void CompleteRender(TaskCompletionSource<object> pendingRenderInfo, string errorMessageOrNull)
{ {
if (errorMessageOrNull == null) if (errorMessageOrNull == null)
@ -255,9 +281,9 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
} }
} }
internal readonly struct PendingRender internal readonly struct UnacknowledgedRenderBatch
{ {
public PendingRender(long batchId, ArrayBuilder<byte> data, TaskCompletionSource<object> completionSource) public UnacknowledgedRenderBatch(long batchId, ArrayBuilder<byte> data, TaskCompletionSource<object> completionSource)
{ {
BatchId = batchId; BatchId = batchId;
Data = data; Data = data;
@ -288,6 +314,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
private static readonly Action<ILogger, string, Exception> _sendBatchDataFailed; private static readonly Action<ILogger, string, Exception> _sendBatchDataFailed;
private static readonly Action<ILogger, long, string, Exception> _completingBatchWithError; private static readonly Action<ILogger, long, string, Exception> _completingBatchWithError;
private static readonly Action<ILogger, long, Exception> _completingBatchWithoutError; private static readonly Action<ILogger, long, Exception> _completingBatchWithoutError;
private static readonly Action<ILogger, long, Exception> _receivedDuplicateBatchAcknowledgement;
private static class EventIds private static class EventIds
{ {
@ -297,6 +324,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
public static readonly EventId SendBatchDataFailed = new EventId(103, "SendBatchDataFailed"); public static readonly EventId SendBatchDataFailed = new EventId(103, "SendBatchDataFailed");
public static readonly EventId CompletingBatchWithError = new EventId(104, "CompletingBatchWithError"); public static readonly EventId CompletingBatchWithError = new EventId(104, "CompletingBatchWithError");
public static readonly EventId CompletingBatchWithoutError = new EventId(105, "CompletingBatchWithoutError"); public static readonly EventId CompletingBatchWithoutError = new EventId(105, "CompletingBatchWithoutError");
public static readonly EventId ReceivedDuplicateBatchAcknowledgement = new EventId(106, "ReceivedDuplicateBatchAcknowledgement");
} }
static Log() static Log()
@ -330,6 +358,11 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
LogLevel.Debug, LogLevel.Debug,
EventIds.CompletingBatchWithoutError, EventIds.CompletingBatchWithoutError,
"Completing batch {BatchId} without error"); "Completing batch {BatchId} without error");
_receivedDuplicateBatchAcknowledgement = LoggerMessage.Define<long>(
LogLevel.Debug,
EventIds.ReceivedDuplicateBatchAcknowledgement,
"Received a duplicate ACK for batch id '{IncomingBatchId}'.");
} }
public static void SendBatchDataFailed(ILogger logger, Exception exception) public static void SendBatchDataFailed(ILogger logger, Exception exception)
@ -379,6 +412,11 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
batchId, batchId,
null); null);
} }
internal static void ReceivedDuplicateBatchAck(ILogger logger, long incomingBatchId)
{
_receivedDuplicateBatchAcknowledgement(logger, incomingBatchId, null);
}
} }
} }
} }

View File

@ -48,7 +48,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
component.TriggerRender(); component.TriggerRender();
// Assert // Assert
Assert.Equal(2, renderer.PendingRenderBatches.Count); Assert.Equal(2, renderer.UnacknowledgedRenderBatches.Count);
} }
[Fact] [Fact]
@ -115,7 +115,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
} }
[Fact] [Fact]
public async Task OnRenderCompletedAsync_ThrowsWhenNoBatchesAreQueued() public async Task OnRenderCompletedAsync_DoesNotThrowWhenReceivedDuplicateAcks()
{ {
// Arrange // Arrange
var serviceProvider = new ServiceCollection().BuildServiceProvider(); var serviceProvider = new ServiceCollection().BuildServiceProvider();
@ -152,7 +152,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
}; };
// This produces an additional batch (id = 3) // This produces an additional batch (id = 3)
trigger.TriggerRender(); trigger.TriggerRender();
var originallyQueuedBatches = renderer.PendingRenderBatches.Count; var originallyQueuedBatches = renderer.UnacknowledgedRenderBatches.Count;
// Act // Act
offlineClient.Transfer(onlineClient.Object, "new-connection"); offlineClient.Transfer(onlineClient.Object, "new-connection");
@ -163,19 +163,22 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
exceptions.Add(e); exceptions.Add(e);
}; };
// Pretend that we missed the ack for the initial batch // Receive the ack for the intial batch
renderer.OnRenderCompleted(2, null); renderer.OnRenderCompleted(2, null);
// Receive the ack for the second batch
renderer.OnRenderCompleted(3, null); renderer.OnRenderCompleted(3, null);
firstBatchTCS.SetResult(null); firstBatchTCS.SetResult(null);
secondBatchTCS.SetResult(null); secondBatchTCS.SetResult(null);
// Repeat the ack for the third batch
renderer.OnRenderCompleted(3, null); renderer.OnRenderCompleted(3, null);
// Assert // Assert
var exception = Assert.Single(exceptions); Assert.Empty(exceptions);
} }
[Fact] [Fact]
public async Task ThrowsIfWeReceiveAnOutOfSequenceClientAcknowledge() public async Task OnRenderCompletedAsync_DoesNotThrowWhenThereAreNoPendingBatchesToAck()
{ {
// Arrange // Arrange
var serviceProvider = new ServiceCollection().BuildServiceProvider(); var serviceProvider = new ServiceCollection().BuildServiceProvider();
@ -212,7 +215,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
}; };
// This produces an additional batch (id = 3) // This produces an additional batch (id = 3)
trigger.TriggerRender(); trigger.TriggerRender();
var originallyQueuedBatches = renderer.PendingRenderBatches.Count; var originallyQueuedBatches = renderer.UnacknowledgedRenderBatches.Count;
// Act // Act
offlineClient.Transfer(onlineClient.Object, "new-connection"); offlineClient.Transfer(onlineClient.Object, "new-connection");
@ -223,13 +226,133 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
exceptions.Add(e); exceptions.Add(e);
}; };
// Receive the ack for the intial batch
renderer.OnRenderCompleted(2, null);
// Receive the ack for the second batch
renderer.OnRenderCompleted(2, null);
firstBatchTCS.SetResult(null);
secondBatchTCS.SetResult(null);
// Repeat the ack for the third batch
renderer.OnRenderCompleted(3, null);
// Assert
Assert.Empty(exceptions);
}
[Fact]
public async Task ConsumesAllPendingBatchesWhenReceivingAHigherSequenceBatchId()
{
// Arrange
var serviceProvider = new ServiceCollection().BuildServiceProvider();
var firstBatchTCS = new TaskCompletionSource<object>();
var secondBatchTCS = new TaskCompletionSource<object>();
var renderIds = new List<long>();
var onlineClient = new Mock<IClientProxy>();
onlineClient.Setup(c => c.SendCoreAsync(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback((string name, object[] value, CancellationToken token) => renderIds.Add((long)value[1]))
.Returns<string, object[], CancellationToken>((n, v, t) => (long)v[1] == 2 ? firstBatchTCS.Task : secondBatchTCS.Task);
var renderer = GetRemoteRenderer(serviceProvider, new CircuitClientProxy(onlineClient.Object, "online-client"));
RenderFragment initialContent = (builder) =>
{
builder.OpenElement(0, "my element");
builder.AddContent(1, "some text");
builder.CloseElement();
};
var trigger = new Trigger();
// This produces the initial batch (id = 2)
var result = await renderer.RenderComponentAsync<AutoParameterTestComponent>(
ParameterCollection.FromDictionary(new Dictionary<string, object>
{
[nameof(AutoParameterTestComponent.Content)] = initialContent,
[nameof(AutoParameterTestComponent.Trigger)] = trigger
}));
trigger.Component.Content = (builder) =>
{
builder.OpenElement(0, "offline element");
builder.AddContent(1, "offline text");
builder.CloseElement();
};
// This produces an additional batch (id = 3)
trigger.TriggerRender();
var originallyQueuedBatches = renderer.UnacknowledgedRenderBatches.Count;
// Act
var exceptions = new List<Exception>();
renderer.UnhandledException += (sender, e) =>
{
exceptions.Add(e);
};
// Pretend that we missed the ack for the initial batch // Pretend that we missed the ack for the initial batch
renderer.OnRenderCompleted(3, null); renderer.OnRenderCompleted(3, null);
firstBatchTCS.SetResult(null); firstBatchTCS.SetResult(null);
secondBatchTCS.SetResult(null); secondBatchTCS.SetResult(null);
// Assert
Assert.Empty(exceptions);
Assert.Empty(renderer.UnacknowledgedRenderBatches);
}
[Fact]
public async Task ThrowsIfWeReceivedAnAcknowledgeForANeverProducedBatch()
{
// Arrange
var serviceProvider = new ServiceCollection().BuildServiceProvider();
var firstBatchTCS = new TaskCompletionSource<object>();
var secondBatchTCS = new TaskCompletionSource<object>();
var renderIds = new List<long>();
var onlineClient = new Mock<IClientProxy>();
onlineClient.Setup(c => c.SendCoreAsync(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback((string name, object[] value, CancellationToken token) => renderIds.Add((long)value[1]))
.Returns<string, object[], CancellationToken>((n, v, t) => (long)v[1] == 2 ? firstBatchTCS.Task : secondBatchTCS.Task);
var renderer = GetRemoteRenderer(serviceProvider, new CircuitClientProxy(onlineClient.Object, "online-client"));
RenderFragment initialContent = (builder) =>
{
builder.OpenElement(0, "my element");
builder.AddContent(1, "some text");
builder.CloseElement();
};
var trigger = new Trigger();
// This produces the initial batch (id = 2)
var result = await renderer.RenderComponentAsync<AutoParameterTestComponent>(
ParameterCollection.FromDictionary(new Dictionary<string, object>
{
[nameof(AutoParameterTestComponent.Content)] = initialContent,
[nameof(AutoParameterTestComponent.Trigger)] = trigger
}));
trigger.Component.Content = (builder) =>
{
builder.OpenElement(0, "offline element");
builder.AddContent(1, "offline text");
builder.CloseElement();
};
// This produces an additional batch (id = 3)
trigger.TriggerRender();
var originallyQueuedBatches = renderer.UnacknowledgedRenderBatches.Count;
// Act
var exceptions = new List<Exception>();
renderer.UnhandledException += (sender, e) =>
{
exceptions.Add(e);
};
renderer.OnRenderCompleted(4, null);
firstBatchTCS.SetResult(null);
secondBatchTCS.SetResult(null);
// Assert // Assert
var exception = Assert.Single(exceptions); var exception = Assert.Single(exceptions);
Assert.Equal(
"Received an acknowledgement for batch with id '4' when the last batch produced was '3'.",
exception.Message);
} }
[Fact] [Fact]
@ -249,7 +372,7 @@ namespace Microsoft.AspNetCore.Components.Web.Rendering
// Assert // Assert
Assert.Equal(0, first.ComponentId); Assert.Equal(0, first.ComponentId);
Assert.Equal(1, second.ComponentId); Assert.Equal(1, second.ComponentId);
Assert.Equal(2, renderer.PendingRenderBatches.Count); Assert.Equal(2, renderer.UnacknowledgedRenderBatches.Count);
} }
private RemoteRenderer GetRemoteRenderer(IServiceProvider serviceProvider, CircuitClientProxy circuitClientProxy) private RemoteRenderer GetRemoteRenderer(IServiceProvider serviceProvider, CircuitClientProxy circuitClientProxy)

File diff suppressed because one or more lines are too long

View File

@ -8,6 +8,8 @@ export class RenderQueue {
private nextBatchId = 2; private nextBatchId = 2;
private fatalError?: string;
public browserRendererId: number; public browserRendererId: number;
public logger: Logger; public logger: Logger;
@ -28,13 +30,23 @@ export class RenderQueue {
return newQueue; return newQueue;
} }
public processBatch(receivedBatchId: number, batchData: Uint8Array, connection: HubConnection): void { public async processBatch(receivedBatchId: number, batchData: Uint8Array, connection: HubConnection): Promise<void> {
if (receivedBatchId < this.nextBatchId) { if (receivedBatchId < this.nextBatchId) {
// SignalR delivers messages in order, but it does not guarantee that the message gets delivered.
// For that reason, if the server re-sends a batch (for example during a reconnection because it didn't get an ack)
// we simply acknowledge it to get back in sync with the server.
await this.completeBatch(connection, receivedBatchId);
this.logger.log(LogLevel.Debug, `Batch ${receivedBatchId} already processed. Waiting for batch ${this.nextBatchId}.`); this.logger.log(LogLevel.Debug, `Batch ${receivedBatchId} already processed. Waiting for batch ${this.nextBatchId}.`);
return; return;
} }
if (receivedBatchId > this.nextBatchId) { if (receivedBatchId > this.nextBatchId) {
if (this.fatalError) {
this.logger.log(LogLevel.Debug, `Received a new batch ${receivedBatchId} but errored out on a previous batch ${this.nextBatchId - 1}`);
await connection.send('OnRenderCompleted', this.nextBatchId - 1, this.fatalError.toString());
return;
}
this.logger.log(LogLevel.Debug, `Waiting for batch ${this.nextBatchId}. Batch ${receivedBatchId} not processed.`); this.logger.log(LogLevel.Debug, `Waiting for batch ${this.nextBatchId}. Batch ${receivedBatchId} not processed.`);
return; return;
} }
@ -43,8 +55,9 @@ export class RenderQueue {
this.nextBatchId++; this.nextBatchId++;
this.logger.log(LogLevel.Debug, `Applying batch ${receivedBatchId}.`); this.logger.log(LogLevel.Debug, `Applying batch ${receivedBatchId}.`);
renderBatch(this.browserRendererId, new OutOfProcessRenderBatch(batchData)); renderBatch(this.browserRendererId, new OutOfProcessRenderBatch(batchData));
this.completeBatch(connection, receivedBatchId); await this.completeBatch(connection, receivedBatchId);
} catch (error) { } catch (error) {
this.fatalError = error.toString();
this.logger.log(LogLevel.Error, `There was an error applying batch ${receivedBatchId}.`); this.logger.log(LogLevel.Error, `There was an error applying batch ${receivedBatchId}.`);
// If there's a rendering exception, notify server *and* throw on client // If there's a rendering exception, notify server *and* throw on client