Merged PR 3115: Wait to Complete Pipe
Fixed the `PipeWriterStream` to properly detect a canceled write and throw an `OperationCanceledException` in those cases. And making sure `Complete` is called in a safe manner by ensuring it is in a lock so writes can't be in-progress.
This commit is contained in:
parent
f198e559f8
commit
7fd42c4e94
|
|
@ -44,4 +44,10 @@ Later on, this will be checked using this condition:
|
|||
Microsoft.AspNetCore.CookiePolicy;
|
||||
</PackagesInPatch>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.1.14' ">
|
||||
<PackagesInPatch>
|
||||
Microsoft.AspNetCore.Http.Connections;
|
||||
Microsoft.AspNetCore.SignalR.Core;
|
||||
</PackagesInPatch>
|
||||
</PropertyGroup>
|
||||
</Project>
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import { ChildProcess, spawn } from "child_process";
|
||||
import * as fs from "fs";
|
||||
import * as _fs from "fs";
|
||||
import { EOL } from "os";
|
||||
import * as path from "path";
|
||||
import { promisify } from "util";
|
||||
import { PassThrough, Readable } from "stream";
|
||||
|
||||
import { run } from "../../webdriver-tap-runner/lib";
|
||||
|
|
@ -9,6 +10,16 @@ import { run } from "../../webdriver-tap-runner/lib";
|
|||
import * as _debug from "debug";
|
||||
const debug = _debug("signalr-functional-tests:run");
|
||||
|
||||
const ARTIFACTS_DIR = path.resolve(__dirname, "..", "..", "..", "..", "artifacts");
|
||||
const LOGS_DIR = path.resolve(ARTIFACTS_DIR, "logs");
|
||||
|
||||
// Promisify things from fs we want to use.
|
||||
const fs = {
|
||||
createWriteStream: _fs.createWriteStream,
|
||||
exists: promisify(_fs.exists),
|
||||
mkdir: promisify(_fs.mkdir),
|
||||
};
|
||||
|
||||
process.on("unhandledRejection", (reason) => {
|
||||
console.error(`Unhandled promise rejection: ${reason}`);
|
||||
process.exit(1);
|
||||
|
|
@ -102,6 +113,13 @@ if (chromePath) {
|
|||
try {
|
||||
const serverPath = path.resolve(__dirname, "..", "bin", configuration, "netcoreapp2.1", "FunctionalTests.dll");
|
||||
|
||||
if (!await fs.exists(ARTIFACTS_DIR)) {
|
||||
await fs.mkdir(ARTIFACTS_DIR);
|
||||
}
|
||||
if (!await fs.exists(LOGS_DIR)) {
|
||||
await fs.mkdir(LOGS_DIR);
|
||||
}
|
||||
|
||||
debug(`Launching Functional Test Server: ${serverPath}`);
|
||||
const dotnet = spawn("dotnet", [serverPath], {
|
||||
env: {
|
||||
|
|
@ -117,6 +135,9 @@ if (chromePath) {
|
|||
}
|
||||
}
|
||||
|
||||
const logStream = fs.createWriteStream(path.resolve(LOGS_DIR, "ts.functionaltests.dotnet.log"));
|
||||
dotnet.stdout.pipe(logStream);
|
||||
|
||||
process.on("SIGINT", cleanup);
|
||||
process.on("exit", cleanup);
|
||||
|
||||
|
|
|
|||
|
|
@ -274,13 +274,35 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
|
|||
// Cancel any pending flushes from back pressure
|
||||
Application?.Output.CancelPendingFlush();
|
||||
|
||||
// Shutdown both sides and wait for nothing
|
||||
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
|
||||
// It is safe to wait for this lock now because the Send will be in one of 4 states
|
||||
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
|
||||
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
|
||||
// throw an InvalidOperationException if they call Write
|
||||
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
|
||||
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
|
||||
// 4. No Send in progress
|
||||
await WriteLock.WaitAsync();
|
||||
try
|
||||
{
|
||||
// Complete the applications read loop
|
||||
Application?.Output.Complete(transportTask.Exception?.InnerException);
|
||||
}
|
||||
finally
|
||||
{
|
||||
WriteLock.Release();
|
||||
}
|
||||
|
||||
Log.WaitingForTransportAndApplication(_logger, TransportType);
|
||||
|
||||
// Wait for application so we can complete the writer safely
|
||||
await applicationTask.NoThrow();
|
||||
|
||||
// Shutdown application side now that it's finished
|
||||
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
|
||||
Application?.Output.Complete(transportTask.Exception?.InnerException);
|
||||
|
||||
try
|
||||
{
|
||||
Log.WaitingForTransportAndApplication(_logger, TransportType);
|
||||
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
|
||||
await Task.WhenAll(applicationTask, transportTask);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -511,6 +511,14 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
|
|||
|
||||
context.Response.StatusCode = StatusCodes.Status404NotFound;
|
||||
context.Response.ContentType = "text/plain";
|
||||
|
||||
// There are no writes anymore (since this is the write "loop")
|
||||
// So it is safe to complete the writer
|
||||
// We complete the writer here because we already have the WriteLock acquired
|
||||
// and it's unsafe to complete outside of the lock
|
||||
// Other code isn't guaranteed to be able to acquire the lock before another write
|
||||
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
|
||||
connection.Application.Output.Complete();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -549,11 +557,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
|
|||
|
||||
Log.TerminatingConection(_logger);
|
||||
|
||||
// Complete the receiving end of the pipe
|
||||
connection.Application.Output.Complete();
|
||||
|
||||
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
|
||||
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
|
||||
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
|
||||
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
|
||||
|
||||
context.Response.StatusCode = StatusCodes.Status202Accepted;
|
||||
context.Response.ContentType = "text/plain";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,27 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
namespace System.Threading.Tasks
|
||||
{
|
||||
internal static class TaskExtensions
|
||||
{
|
||||
public static async Task NoThrow(this Task task)
|
||||
{
|
||||
await new NoThrowAwaiter(task);
|
||||
}
|
||||
}
|
||||
|
||||
internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
|
||||
{
|
||||
private readonly Task _task;
|
||||
public NoThrowAwaiter(Task task) { _task = task; }
|
||||
public NoThrowAwaiter GetAwaiter() => this;
|
||||
public bool IsCompleted => _task.IsCompleted;
|
||||
// Observe exception
|
||||
public void GetResult() { _ = _task.Exception; }
|
||||
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
|
||||
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
|
|
@ -76,7 +76,15 @@ namespace System.IO.Pipelines
|
|||
|
||||
_length += source.Length;
|
||||
var task = _pipeWriter.WriteAsync(source);
|
||||
if (!task.IsCompletedSuccessfully)
|
||||
if (task.IsCompletedSuccessfully)
|
||||
{
|
||||
// Cancellation can be triggered by PipeWriter.CancelPendingFlush
|
||||
if (task.Result.IsCanceled)
|
||||
{
|
||||
throw new OperationCanceledException();
|
||||
}
|
||||
}
|
||||
else if (!task.IsCompletedSuccessfully)
|
||||
{
|
||||
return WriteSlowAsync(task);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
|
||||
private long _lastSendTimestamp = Stopwatch.GetTimestamp();
|
||||
private ReadOnlyMemory<byte> _cachedPingMessage;
|
||||
private volatile bool _connectionAborted;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
|
||||
|
|
@ -99,6 +100,12 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return new ValueTask(WriteSlowAsync(message));
|
||||
}
|
||||
|
||||
if (_connectionAborted)
|
||||
{
|
||||
_writeLock.Release();
|
||||
return default;
|
||||
}
|
||||
|
||||
// This method should never throw synchronously
|
||||
var task = WriteCore(message);
|
||||
|
||||
|
|
@ -129,6 +136,12 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return new ValueTask(WriteSlowAsync(message));
|
||||
}
|
||||
|
||||
if (_connectionAborted)
|
||||
{
|
||||
_writeLock.Release();
|
||||
return default;
|
||||
}
|
||||
|
||||
// This method should never throw synchronously
|
||||
var task = WriteCore(message);
|
||||
|
||||
|
|
@ -158,6 +171,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
|
||||
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
|
||||
}
|
||||
}
|
||||
|
|
@ -175,6 +190,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
|
||||
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
|
||||
}
|
||||
}
|
||||
|
|
@ -188,6 +205,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -201,6 +220,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
await _writeLock.WaitAsync();
|
||||
try
|
||||
{
|
||||
if (_connectionAborted)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Failed to get the lock immediately when entering WriteAsync so await until it is available
|
||||
|
||||
await WriteCore(message);
|
||||
|
|
@ -208,6 +232,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -219,6 +245,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
try
|
||||
{
|
||||
if (_connectionAborted)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Failed to get the lock immediately when entering WriteAsync so await until it is available
|
||||
await _writeLock.WaitAsync();
|
||||
|
||||
|
|
@ -227,6 +258,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -250,6 +283,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
try
|
||||
{
|
||||
if (_connectionAborted)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _connectionContext.Transport.Output.WriteAsync(_cachedPingMessage);
|
||||
|
||||
Log.SentPing(_logger);
|
||||
|
|
@ -257,6 +295,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -293,6 +333,12 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
/// </summary>
|
||||
public virtual void Abort()
|
||||
{
|
||||
_connectionAborted = true;
|
||||
|
||||
// Cancel any current writes or writes that are about to happen and have already gone past the _connectionAborted bool
|
||||
// We have to do this outside of the lock otherwise it could hang if the write is observing backpressure
|
||||
_connectionContext.Transport.Output.CancelPendingFlush();
|
||||
|
||||
// If we already triggered the token then noop, this isn't thread safe but it's good enough
|
||||
// to avoid spawning a new task in the most common cases
|
||||
if (_connectionAbortedTokenSource.IsCancellationRequested)
|
||||
|
|
@ -423,9 +469,24 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
internal Task AbortAsync()
|
||||
{
|
||||
Abort();
|
||||
|
||||
// Acquire lock to make sure all writes are completed
|
||||
if (!_writeLock.Wait(0))
|
||||
{
|
||||
return AbortAsyncSlow();
|
||||
}
|
||||
|
||||
_writeLock.Release();
|
||||
return _abortCompletedTcs.Task;
|
||||
}
|
||||
|
||||
private async Task AbortAsyncSlow()
|
||||
{
|
||||
await _writeLock.WaitAsync();
|
||||
_writeLock.Release();
|
||||
await _abortCompletedTcs.Task;
|
||||
}
|
||||
|
||||
private void KeepAliveTick()
|
||||
{
|
||||
var timestamp = Stopwatch.GetTimestamp();
|
||||
|
|
|
|||
|
|
@ -79,9 +79,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
{
|
||||
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
|
||||
|
||||
await client.InvokeAsync(nameof(AbortHub.Kill));
|
||||
await client.SendInvocationAsync(nameof(AbortHub.Kill));
|
||||
|
||||
await connectionHandlerTask.OrTimeout();
|
||||
|
||||
Assert.Null(client.TryRead());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue