Don't release SemaphoreSlim when it is canceled (#12818)
* Don't release SemaphoreSlim when it is cancelled * fixed tests * Rebased * Updated ref * mark test as flaky
This commit is contained in:
parent
1f7d59d8f1
commit
c605d6cf63
|
|
@ -97,6 +97,7 @@ namespace Microsoft.AspNetCore.Components
|
|||
public abstract partial class Dispatcher
|
||||
{
|
||||
protected Dispatcher() { }
|
||||
public void AssertAccess() { }
|
||||
public abstract bool CheckAccess();
|
||||
public static Microsoft.AspNetCore.Components.Dispatcher CreateDefault() { throw null; }
|
||||
public abstract System.Threading.Tasks.Task InvokeAsync(System.Action workItem);
|
||||
|
|
|
|||
|
|
@ -97,6 +97,7 @@ namespace Microsoft.AspNetCore.Components
|
|||
public abstract partial class Dispatcher
|
||||
{
|
||||
protected Dispatcher() { }
|
||||
public void AssertAccess() { }
|
||||
public abstract bool CheckAccess();
|
||||
public static Microsoft.AspNetCore.Components.Dispatcher CreateDefault() { throw null; }
|
||||
public abstract System.Threading.Tasks.Task InvokeAsync(System.Action workItem);
|
||||
|
|
|
|||
|
|
@ -23,6 +23,20 @@ namespace Microsoft.AspNetCore.Components
|
|||
/// </summary>
|
||||
internal event UnhandledExceptionEventHandler UnhandledException;
|
||||
|
||||
/// <summary>
|
||||
/// Validates that the currently executing code is running inside the dispatcher.
|
||||
/// </summary>
|
||||
public void AssertAccess()
|
||||
{
|
||||
if (!CheckAccess())
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"The current thread is not associated with the Dispatcher. " +
|
||||
"Use InvokeAsync() to switch execution to the Dispatcher when " +
|
||||
"triggering rendering or component state.");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a value that determines whether using the dispatcher to invoke a work item is required
|
||||
/// from the current context.
|
||||
|
|
|
|||
|
|
@ -209,7 +209,7 @@ namespace Microsoft.AspNetCore.Components.Rendering
|
|||
/// </returns>
|
||||
public virtual Task DispatchEventAsync(ulong eventHandlerId, EventFieldInfo fieldInfo, EventArgs eventArgs)
|
||||
{
|
||||
EnsureSynchronizationContext();
|
||||
Dispatcher.AssertAccess();
|
||||
|
||||
if (!_eventBindings.TryGetValue(eventHandlerId, out var callback))
|
||||
{
|
||||
|
|
@ -337,7 +337,7 @@ namespace Microsoft.AspNetCore.Components.Rendering
|
|||
/// <param name="renderFragment">A <see cref="RenderFragment"/> that will supply the updated UI contents.</param>
|
||||
internal void AddToRenderQueue(int componentId, RenderFragment renderFragment)
|
||||
{
|
||||
EnsureSynchronizationContext();
|
||||
Dispatcher.AssertAccess();
|
||||
|
||||
var componentState = GetOptionalComponentState(componentId);
|
||||
if (componentState == null)
|
||||
|
|
@ -374,21 +374,6 @@ namespace Microsoft.AspNetCore.Components.Rendering
|
|||
return eventHandlerId;
|
||||
}
|
||||
|
||||
private void EnsureSynchronizationContext()
|
||||
{
|
||||
// Render operations are not thread-safe, so they need to be serialized by the dispatcher.
|
||||
// Plus, any other logic that mutates state accessed during rendering also
|
||||
// needs not to run concurrently with rendering so should be dispatched to
|
||||
// the renderer's sync context.
|
||||
if (!Dispatcher.CheckAccess())
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"The current thread is not associated with the Dispatcher. " +
|
||||
"Use Invoke() or InvokeAsync() to switch execution to the Dispatcher when " +
|
||||
"triggering rendering or modifying any state accessed during rendering.");
|
||||
}
|
||||
}
|
||||
|
||||
private ComponentState GetRequiredComponentState(int componentId)
|
||||
=> _componentStateById.TryGetValue(componentId, out var componentState)
|
||||
? componentState
|
||||
|
|
@ -414,7 +399,7 @@ namespace Microsoft.AspNetCore.Components.Rendering
|
|||
|
||||
private void ProcessRenderQueue()
|
||||
{
|
||||
EnsureSynchronizationContext();
|
||||
Dispatcher.AssertAccess();
|
||||
|
||||
if (_isBatchInProgress)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
{
|
||||
internal class CircuitHost : IAsyncDisposable
|
||||
{
|
||||
private readonly SemaphoreSlim HandlerLock = new SemaphoreSlim(1);
|
||||
private readonly IServiceScope _scope;
|
||||
private readonly CircuitOptions _options;
|
||||
private readonly CircuitHandler[] _circuitHandlers;
|
||||
|
|
@ -187,35 +186,28 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
{
|
||||
Log.CircuitOpened(_logger, Circuit.Id);
|
||||
|
||||
await HandlerLock.WaitAsync(cancellationToken);
|
||||
Renderer.Dispatcher.AssertAccess();
|
||||
|
||||
try
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
{
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
await circuitHandler.OnCircuitOpenedAsync(Circuit, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnCircuitOpenedAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
await circuitHandler.OnCircuitOpenedAsync(Circuit, cancellationToken);
|
||||
}
|
||||
|
||||
if (exceptions != null)
|
||||
catch (Exception ex)
|
||||
{
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnCircuitOpenedAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (exceptions != null)
|
||||
{
|
||||
HandlerLock.Release();
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -223,35 +215,28 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
{
|
||||
Log.ConnectionUp(_logger, Circuit.Id, Client.ConnectionId);
|
||||
|
||||
await HandlerLock.WaitAsync(cancellationToken);
|
||||
Renderer.Dispatcher.AssertAccess();
|
||||
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
try
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
{
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
await circuitHandler.OnConnectionUpAsync(Circuit, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnConnectionUpAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
await circuitHandler.OnConnectionUpAsync(Circuit, cancellationToken);
|
||||
}
|
||||
|
||||
if (exceptions != null)
|
||||
catch (Exception ex)
|
||||
{
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnConnectionUpAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (exceptions != null)
|
||||
{
|
||||
HandlerLock.Release();
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -259,35 +244,28 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
{
|
||||
Log.ConnectionDown(_logger, Circuit.Id, Client.ConnectionId);
|
||||
|
||||
await HandlerLock.WaitAsync(cancellationToken);
|
||||
Renderer.Dispatcher.AssertAccess();
|
||||
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
try
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
{
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
await circuitHandler.OnConnectionDownAsync(Circuit, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnConnectionDownAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
await circuitHandler.OnConnectionDownAsync(Circuit, cancellationToken);
|
||||
}
|
||||
|
||||
if (exceptions != null)
|
||||
catch (Exception ex)
|
||||
{
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnConnectionDownAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (exceptions != null)
|
||||
{
|
||||
HandlerLock.Release();
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -295,35 +273,26 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
{
|
||||
Log.CircuitClosed(_logger, Circuit.Id);
|
||||
|
||||
await HandlerLock.WaitAsync(cancellationToken);
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
try
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
{
|
||||
List<Exception> exceptions = null;
|
||||
|
||||
for (var i = 0; i < _circuitHandlers.Length; i++)
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
var circuitHandler = _circuitHandlers[i];
|
||||
try
|
||||
{
|
||||
await circuitHandler.OnCircuitClosedAsync(Circuit, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnCircuitClosedAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
await circuitHandler.OnCircuitClosedAsync(Circuit, cancellationToken);
|
||||
}
|
||||
|
||||
if (exceptions != null)
|
||||
catch (Exception ex)
|
||||
{
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
Log.CircuitHandlerFailed(_logger, circuitHandler, nameof(CircuitHandler.OnCircuitClosedAsync), ex);
|
||||
exceptions ??= new List<Exception>();
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (exceptions != null)
|
||||
{
|
||||
HandlerLock.Release();
|
||||
throw new AggregateException("Encountered exceptions while executing circuit handlers.", exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -270,46 +270,6 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
Assert.False(registry.DisconnectedCircuits.TryGetValue(circuitHost.CircuitId, out _));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Connect_WhileDisconnectIsInProgress_SeriallyExecutesCircuitHandlers()
|
||||
{
|
||||
// Arrange
|
||||
var circuitIdFactory = TestCircuitIdFactory.CreateTestFactory();
|
||||
|
||||
var registry = new TestCircuitRegistry(circuitIdFactory);
|
||||
registry.BeforeDisconnect = new ManualResetEventSlim();
|
||||
// This verifies that connection up \ down events on a circuit handler are always invoked serially.
|
||||
var circuitHandler = new SerialCircuitHandler();
|
||||
var tcs = new TaskCompletionSource<int>();
|
||||
|
||||
var circuitHost = TestCircuitHost.Create(circuitIdFactory.CreateCircuitId(), handlers: new[] { circuitHandler });
|
||||
registry.Register(circuitHost);
|
||||
var client = Mock.Of<IClientProxy>();
|
||||
var newId = "new-connection";
|
||||
|
||||
// Act
|
||||
var disconnect = Task.Run(() =>
|
||||
{
|
||||
var task = registry.DisconnectAsync(circuitHost, circuitHost.Client.ConnectionId);
|
||||
tcs.SetResult(0);
|
||||
return task;
|
||||
});
|
||||
var connect = Task.Run(async () =>
|
||||
{
|
||||
registry.BeforeDisconnect.Set();
|
||||
await tcs.Task;
|
||||
await registry.ConnectAsync(circuitHost.CircuitId, client, newId, default);
|
||||
});
|
||||
await Task.WhenAll(disconnect, connect);
|
||||
|
||||
// Assert
|
||||
Assert.Single(registry.ConnectedCircuits.Values);
|
||||
Assert.False(registry.DisconnectedCircuits.TryGetValue(circuitHost.CircuitId, out _));
|
||||
|
||||
Assert.True(circuitHandler.OnConnectionDownExecuted, "OnConnectionDownAsync should have been executed.");
|
||||
Assert.True(circuitHandler.OnConnectionUpExecuted, "OnConnectionUpAsync should have been executed.");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisconnectWhenAConnectIsInProgress()
|
||||
{
|
||||
|
|
@ -444,37 +404,5 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
|
|||
NullLogger<CircuitRegistry>.Instance,
|
||||
factory ?? TestCircuitIdFactory.CreateTestFactory());
|
||||
}
|
||||
|
||||
private class SerialCircuitHandler : CircuitHandler
|
||||
{
|
||||
private readonly SemaphoreSlim _sempahore = new SemaphoreSlim(1);
|
||||
|
||||
public bool OnConnectionUpExecuted { get; private set; }
|
||||
public bool OnConnectionDownExecuted { get; private set; }
|
||||
|
||||
public override async Task OnConnectionUpAsync(Circuit circuit, CancellationToken cancellationToken)
|
||||
{
|
||||
Assert.True(await _sempahore.WaitAsync(0), "This should be serialized and consequently without contention");
|
||||
await Task.Delay(10);
|
||||
|
||||
Assert.False(OnConnectionUpExecuted);
|
||||
Assert.True(OnConnectionDownExecuted);
|
||||
OnConnectionUpExecuted = true;
|
||||
|
||||
_sempahore.Release();
|
||||
}
|
||||
|
||||
public override async Task OnConnectionDownAsync(Circuit circuit, CancellationToken cancellationToken)
|
||||
{
|
||||
Assert.True(await _sempahore.WaitAsync(0), "This should be serialized and consequently without contention");
|
||||
await Task.Delay(10);
|
||||
|
||||
Assert.False(OnConnectionUpExecuted);
|
||||
Assert.False(OnConnectionDownExecuted);
|
||||
OnConnectionDownExecuted = true;
|
||||
|
||||
_sempahore.Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -264,6 +264,7 @@ namespace Microsoft.AspNetCore.Components.E2ETest.ServerExecutionTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
[Flaky("https://github.com/aspnet/AspNetCore/issues/13086", FlakyOn.AzP.Windows)]
|
||||
public async Task ContinuesWorkingAfterInvalidAsyncReturnCallback()
|
||||
{
|
||||
// Arrange
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.Components.E2ETest.ServerExecutionTests
|
|||
appElement.FindElement(By.Id("run-without-dispatch")).Click();
|
||||
|
||||
Browser.Contains(
|
||||
$"{typeof(InvalidOperationException).FullName}: The current thread is not associated with the Dispatcher. Use Invoke() or InvokeAsync() to switch execution to the Dispatcher when triggering rendering or modifying any state accessed during rendering.",
|
||||
$"{typeof(InvalidOperationException).FullName}: The current thread is not associated with the Dispatcher. Use InvokeAsync() to switch execution to the Dispatcher when triggering rendering or component state.",
|
||||
() => result.Text);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue