Merge in 'release/5.0' changes
This commit is contained in:
commit
8413e34127
|
|
@ -172,13 +172,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
protected ValueTask<ReadResult> StartTimingReadAsync(ValueTask<ReadResult> readAwaitable, CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
if (!readAwaitable.IsCompleted && _timingEnabled)
|
||||
if (!readAwaitable.IsCompleted)
|
||||
{
|
||||
TryProduceContinue();
|
||||
|
||||
_backpressure = true;
|
||||
_context.TimeoutControl.StartTimingRead();
|
||||
if (_timingEnabled)
|
||||
{
|
||||
_backpressure = true;
|
||||
_context.TimeoutControl.StartTimingRead();
|
||||
}
|
||||
}
|
||||
|
||||
return readAwaitable;
|
||||
|
|
|
|||
|
|
@ -47,8 +47,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
private static readonly Action<ILogger, Exception> _notAllConnectionsAborted =
|
||||
LoggerMessage.Define(LogLevel.Debug, new EventId(21, nameof(NotAllConnectionsAborted)), "Some connections failed to abort during server shutdown.");
|
||||
|
||||
private static readonly Action<ILogger, TimeSpan, TimeSpan, DateTimeOffset, Exception> _heartbeatSlow =
|
||||
LoggerMessage.Define<TimeSpan, TimeSpan, DateTimeOffset>(LogLevel.Warning, new EventId(22, nameof(HeartbeatSlow)), @"As of ""{now}"", the heartbeat has been running for ""{heartbeatDuration}"" which is longer than ""{interval}"". This could be caused by thread pool starvation.");
|
||||
private static readonly Action<ILogger, DateTimeOffset, TimeSpan, TimeSpan, Exception> _heartbeatSlow =
|
||||
LoggerMessage.Define<DateTimeOffset, TimeSpan, TimeSpan>(LogLevel.Warning, new EventId(22, "HeartbeatSlow"), @"As of ""{now}"", the heartbeat has been running for ""{heartbeatDuration}"" which is longer than ""{interval}"". This could be caused by thread pool starvation.");
|
||||
|
||||
private static readonly Action<ILogger, string, Exception> _applicationNeverCompleted =
|
||||
LoggerMessage.Define<string>(LogLevel.Critical, new EventId(23, nameof(ApplicationNeverCompleted)), @"Connection id ""{ConnectionId}"" application never completed");
|
||||
|
|
@ -196,7 +196,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
public virtual void HeartbeatSlow(TimeSpan heartbeatDuration, TimeSpan interval, DateTimeOffset now)
|
||||
{
|
||||
_heartbeatSlow(_logger, heartbeatDuration, interval, now, null);
|
||||
_heartbeatSlow(_logger, now, heartbeatDuration, interval, null);
|
||||
}
|
||||
|
||||
public virtual void ApplicationNeverCompleted(string connectionId)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
|
@ -22,15 +23,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public async Task HeartbeatTakingLongerThanIntervalIsLoggedAsError()
|
||||
public async Task HeartbeatTakingLongerThanIntervalIsLoggedAsWarning()
|
||||
{
|
||||
var systemClock = new MockSystemClock();
|
||||
var heartbeatHandler = new Mock<IHeartbeatHandler>();
|
||||
var debugger = new Mock<IDebugger>();
|
||||
var kestrelTrace = new Mock<IKestrelTrace>();
|
||||
var kestrelTrace = new TestKestrelTrace();
|
||||
var handlerMre = new ManualResetEventSlim();
|
||||
var handlerStartedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var now = systemClock.UtcNow;
|
||||
var heartbeatDuration = TimeSpan.FromSeconds(2);
|
||||
|
||||
heartbeatHandler.Setup(h => h.OnHeartbeat(now)).Callback(() =>
|
||||
{
|
||||
|
|
@ -41,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
Task blockedHeartbeatTask;
|
||||
|
||||
using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace.Object))
|
||||
using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace))
|
||||
{
|
||||
blockedHeartbeatTask = Task.Run(() => heartbeat.OnHeartbeat());
|
||||
|
||||
|
|
@ -56,11 +58,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
await blockedHeartbeatTask.DefaultTimeout();
|
||||
|
||||
heartbeatHandler.Verify(h => h.OnHeartbeat(now), Times.Once());
|
||||
kestrelTrace.Verify(t => t.HeartbeatSlow(TimeSpan.FromSeconds(2), Heartbeat.Interval, now), Times.Once());
|
||||
|
||||
var warningMessage = kestrelTrace.Logger.Messages.Single(message => message.LogLevel == LogLevel.Warning).Message;
|
||||
Assert.Equal($"As of \"{now.ToString(CultureInfo.InvariantCulture)}\", the heartbeat has been running for "
|
||||
+ $"\"{heartbeatDuration.ToString("c", CultureInfo.InvariantCulture)}\" which is longer than "
|
||||
+ $"\"{Heartbeat.Interval.ToString("c", CultureInfo.InvariantCulture)}\". "
|
||||
+ "This could be caused by thread pool starvation.", warningMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task HeartbeatTakingLongerThanIntervalIsNotLoggedAsErrorIfDebuggerAttached()
|
||||
public async Task HeartbeatTakingLongerThanIntervalIsNotLoggedIfDebuggerAttached()
|
||||
{
|
||||
var systemClock = new MockSystemClock();
|
||||
var heartbeatHandler = new Mock<IHeartbeatHandler>();
|
||||
|
|
|
|||
|
|
@ -846,6 +846,42 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Expect100ContinueHonoredWhenMinRequestBodyDataRateIsDisabled()
|
||||
{
|
||||
var testContext = new TestServiceContext(LoggerFactory);
|
||||
|
||||
// This may seem unrelated, but this is a regression test for
|
||||
// https://github.com/dotnet/aspnetcore/issues/30449
|
||||
testContext.ServerOptions.Limits.MinRequestBodyDataRate = null;
|
||||
|
||||
await using (var server = new TestServer(TestApp.EchoAppChunked, testContext))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
"Expect: 100-continue",
|
||||
"Connection: close",
|
||||
"Content-Length: 11",
|
||||
"\r\n");
|
||||
await connection.Receive(
|
||||
"HTTP/1.1 100 Continue",
|
||||
"",
|
||||
"");
|
||||
await connection.Send("Hello World");
|
||||
await connection.ReceiveEnd(
|
||||
"HTTP/1.1 200 OK",
|
||||
"Connection: close",
|
||||
$"Date: {testContext.DateHeaderValue}",
|
||||
"Content-Length: 11",
|
||||
"",
|
||||
"Hello World");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ZeroContentLengthAssumedOnNonKeepAliveRequestsWithoutContentLengthOrTransferEncodingHeader()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ export class HttpConnection implements IConnection {
|
|||
private transport?: ITransport;
|
||||
private startInternalPromise?: Promise<void>;
|
||||
private stopPromise?: Promise<void>;
|
||||
private stopPromiseResolver!: (value?: PromiseLike<void>) => void;
|
||||
private stopPromiseResolver: (value?: PromiseLike<void>) => void = () => {};
|
||||
private stopError?: Error;
|
||||
private accessTokenFactory?: () => string | Promise<string>;
|
||||
private sendQueue?: TransportSendQueue;
|
||||
|
|
@ -214,7 +214,6 @@ export class HttpConnection implements IConnection {
|
|||
this.transport = undefined;
|
||||
} else {
|
||||
this.logger.log(LogLevel.Debug, "HttpConnection.transport is undefined in HttpConnection.stop() because start() failed.");
|
||||
this.stopConnection();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -294,6 +293,9 @@ export class HttpConnection implements IConnection {
|
|||
this.logger.log(LogLevel.Error, "Failed to start the connection: " + e);
|
||||
this.connectionState = ConnectionState.Disconnected;
|
||||
this.transport = undefined;
|
||||
|
||||
// if start fails, any active calls to stop assume that start will complete the stop promise
|
||||
this.stopPromiseResolver();
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -766,7 +766,11 @@ export class HubConnection {
|
|||
this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`);
|
||||
|
||||
if (this.connectionState !== HubConnectionState.Reconnecting) {
|
||||
this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting.");
|
||||
this.logger.log(LogLevel.Debug, `Connection moved to the '${this.connectionState}' from the reconnecting state during reconnect attempt. Done reconnecting.`);
|
||||
// The TypeScript compiler thinks that connectionState must be Connected here. The TypeScript compiler is wrong.
|
||||
if (this.connectionState as any === HubConnectionState.Disconnecting) {
|
||||
this.completeClose();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -134,23 +134,30 @@ describe("HttpConnection", () => {
|
|||
|
||||
it("can stop a starting connection", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
const stoppingPromise = new PromiseSource();
|
||||
const startingPromise = new PromiseSource();
|
||||
const options: IHttpConnectionOptions = {
|
||||
...commonOptions,
|
||||
httpClient: new TestHttpClient()
|
||||
.on("POST", async () => {
|
||||
await connection.stop();
|
||||
startingPromise.resolve();
|
||||
await stoppingPromise;
|
||||
return "{}";
|
||||
})
|
||||
.on("GET", async () => {
|
||||
await connection.stop();
|
||||
return "";
|
||||
}),
|
||||
logger,
|
||||
} as IHttpConnectionOptions;
|
||||
|
||||
const connection = new HttpConnection("http://tempuri.org", options);
|
||||
|
||||
await expect(connection.start(TransferFormat.Text))
|
||||
const startPromise = connection.start(TransferFormat.Text);
|
||||
|
||||
await startingPromise;
|
||||
const stopPromise = connection.stop();
|
||||
stoppingPromise.resolve();
|
||||
|
||||
await stopPromise;
|
||||
|
||||
await expect(startPromise)
|
||||
.rejects
|
||||
.toThrow("The connection was stopped during negotiation.");
|
||||
},
|
||||
|
|
|
|||
|
|
@ -2,13 +2,17 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy";
|
||||
import { HttpConnection, INegotiateResponse } from "../src/HttpConnection";
|
||||
import { HubConnection, HubConnectionState } from "../src/HubConnection";
|
||||
import { IHttpConnectionOptions } from "../src/IHttpConnectionOptions";
|
||||
import { MessageType } from "../src/IHubProtocol";
|
||||
import { RetryContext } from "../src/IRetryPolicy";
|
||||
import { JsonHubProtocol } from "../src/JsonHubProtocol";
|
||||
|
||||
import { VerifyLogger } from "./Common";
|
||||
import { TestConnection } from "./TestConnection";
|
||||
import { TestHttpClient } from "./TestHttpClient";
|
||||
import { TestEvent, TestMessageEvent, TestWebSocket } from "./TestWebSocket";
|
||||
import { PromiseSource } from "./Utils";
|
||||
|
||||
describe("auto reconnect", () => {
|
||||
|
|
@ -785,4 +789,93 @@ describe("auto reconnect", () => {
|
|||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("can be stopped while restarting the underlying connection and negotiate throws", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
let onreconnectingCount = 0;
|
||||
let onreconnectedCount = 0;
|
||||
let closeCount = 0;
|
||||
|
||||
const nextRetryDelayCalledPromise = new PromiseSource();
|
||||
|
||||
const defaultConnectionId = "abc123";
|
||||
const defaultConnectionToken = "123abc";
|
||||
const defaultNegotiateResponse: INegotiateResponse = {
|
||||
availableTransports: [
|
||||
{ transport: "WebSockets", transferFormats: ["Text", "Binary"] },
|
||||
{ transport: "ServerSentEvents", transferFormats: ["Text"] },
|
||||
{ transport: "LongPolling", transferFormats: ["Text", "Binary"] },
|
||||
],
|
||||
connectionId: defaultConnectionId,
|
||||
connectionToken: defaultConnectionToken,
|
||||
negotiateVersion: 1,
|
||||
};
|
||||
|
||||
const startStarted = new PromiseSource();
|
||||
let negotiateCount = 0;
|
||||
|
||||
const options: IHttpConnectionOptions = {
|
||||
WebSocket: TestWebSocket,
|
||||
httpClient: new TestHttpClient()
|
||||
.on("POST", async () => {
|
||||
++negotiateCount;
|
||||
if (negotiateCount === 1) {
|
||||
return defaultNegotiateResponse;
|
||||
}
|
||||
startStarted.resolve();
|
||||
return Promise.reject("Error with negotiate");
|
||||
})
|
||||
.on("GET", () => ""),
|
||||
logger,
|
||||
} as IHttpConnectionOptions;
|
||||
|
||||
const connection = new HttpConnection("http://tempuri.org", options);
|
||||
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
|
||||
nextRetryDelayInMilliseconds() {
|
||||
nextRetryDelayCalledPromise.resolve();
|
||||
return 0;
|
||||
},
|
||||
});
|
||||
|
||||
hubConnection.onreconnecting(() => {
|
||||
onreconnectingCount++;
|
||||
});
|
||||
|
||||
hubConnection.onreconnected(() => {
|
||||
onreconnectedCount++;
|
||||
});
|
||||
|
||||
hubConnection.onclose(() => {
|
||||
closeCount++;
|
||||
});
|
||||
|
||||
TestWebSocket.webSocketSet = new PromiseSource();
|
||||
const startPromise = hubConnection.start();
|
||||
await TestWebSocket.webSocketSet;
|
||||
await TestWebSocket.webSocket.openSet;
|
||||
TestWebSocket.webSocket.onopen(new TestEvent());
|
||||
TestWebSocket.webSocket.onmessage(new TestMessageEvent("{}\x1e"));
|
||||
|
||||
await startPromise;
|
||||
TestWebSocket.webSocket.close();
|
||||
TestWebSocket.webSocketSet = new PromiseSource();
|
||||
|
||||
await nextRetryDelayCalledPromise;
|
||||
|
||||
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
|
||||
expect(onreconnectingCount).toBe(1);
|
||||
expect(onreconnectedCount).toBe(0);
|
||||
expect(closeCount).toBe(0);
|
||||
|
||||
await startStarted;
|
||||
await hubConnection.stop();
|
||||
|
||||
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
|
||||
expect(onreconnectingCount).toBe(1);
|
||||
expect(onreconnectedCount).toBe(0);
|
||||
expect(closeCount).toBe(1);
|
||||
},
|
||||
"Failed to complete negotiation with the server: Error with negotiate",
|
||||
"Failed to start the connection: Error with negotiate");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -221,3 +221,57 @@ export class TestCloseEvent {
|
|||
public CAPTURING_PHASE: number = 0;
|
||||
public NONE: number = 0;
|
||||
}
|
||||
|
||||
export class TestMessageEvent implements MessageEvent {
|
||||
constructor(data: any) {
|
||||
this.data = data;
|
||||
}
|
||||
public data: any;
|
||||
public lastEventId: string = "";
|
||||
public origin: string = "";
|
||||
public ports: MessagePort[] = [];
|
||||
public source: Window | null = null;
|
||||
public composed: boolean = false;
|
||||
public composedPath(): EventTarget[];
|
||||
public composedPath(): any[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public code: number = 0;
|
||||
public reason: string = "";
|
||||
public wasClean: boolean = false;
|
||||
public initMessageEvent(typeArg: string, canBubbleArg: boolean, cancelableArg: boolean, data: any, origin: string, lastEventId: string): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public bubbles: boolean = false;
|
||||
public cancelBubble: boolean = false;
|
||||
public cancelable: boolean = false;
|
||||
public currentTarget!: EventTarget;
|
||||
public defaultPrevented: boolean = false;
|
||||
public eventPhase: number = 0;
|
||||
public isTrusted: boolean = false;
|
||||
public returnValue: boolean = false;
|
||||
public scoped: boolean = false;
|
||||
public srcElement!: Element | null;
|
||||
public target!: EventTarget;
|
||||
public timeStamp: number = 0;
|
||||
public type: string = "";
|
||||
public deepPath(): EventTarget[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public initEvent(type: string, bubbles?: boolean | undefined, cancelable?: boolean | undefined): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public preventDefault(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public stopImmediatePropagation(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public stopPropagation(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public AT_TARGET: number = 0;
|
||||
public BUBBLING_PHASE: number = 0;
|
||||
public CAPTURING_PHASE: number = 0;
|
||||
public NONE: number = 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue