Add "allowReconnect" to SignalR CloseMessages (#14908)
This commit is contained in:
parent
5dfb923c68
commit
a4af6185ea
|
|
@ -78,7 +78,7 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack
|
|||
message = PingMessage.Instance;
|
||||
return true;
|
||||
case HubProtocolConstants.CloseMessageType:
|
||||
message = CreateCloseMessage(ref reader);
|
||||
message = CreateCloseMessage(ref reader, itemCount);
|
||||
return true;
|
||||
default:
|
||||
// Future protocol changes can add message types, old clients can ignore them
|
||||
|
|
@ -196,10 +196,23 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack
|
|||
return ApplyHeaders(headers, new CancelInvocationMessage(invocationId));
|
||||
}
|
||||
|
||||
private static CloseMessage CreateCloseMessage(ref MessagePackReader reader)
|
||||
private static CloseMessage CreateCloseMessage(ref MessagePackReader reader, int itemCount)
|
||||
{
|
||||
var error = ReadString(ref reader, "error");
|
||||
return new CloseMessage(error);
|
||||
var allowReconnect = false;
|
||||
|
||||
if (itemCount > 2)
|
||||
{
|
||||
allowReconnect = ReadBoolean(ref reader, "allowReconnect");
|
||||
}
|
||||
|
||||
// An empty string is still an error
|
||||
if (error == null && !allowReconnect)
|
||||
{
|
||||
return CloseMessage.Empty;
|
||||
}
|
||||
|
||||
return new CloseMessage(error, allowReconnect);
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> ReadHeaders(ref MessagePackReader reader)
|
||||
|
|
@ -515,7 +528,7 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack
|
|||
|
||||
private void WriteCloseMessage(CloseMessage message, ref MessagePackWriter writer)
|
||||
{
|
||||
writer.WriteArrayHeader(2);
|
||||
writer.WriteArrayHeader(3);
|
||||
writer.Write(HubProtocolConstants.CloseMessageType);
|
||||
if (string.IsNullOrEmpty(message.Error))
|
||||
{
|
||||
|
|
@ -525,6 +538,8 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack
|
|||
{
|
||||
writer.Write(message.Error);
|
||||
}
|
||||
|
||||
writer.Write(message.AllowReconnect);
|
||||
}
|
||||
|
||||
private void WritePingMessage(PingMessage _, ref MessagePackWriter writer)
|
||||
|
|
@ -559,6 +574,17 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack
|
|||
return destination;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool ReadBoolean(ref MessagePackReader reader, string field)
|
||||
{
|
||||
if (reader.End || reader.NextMessagePackType != MessagePackType.Boolean)
|
||||
{
|
||||
ThrowInvalidDataException(field, "Boolean");
|
||||
}
|
||||
|
||||
return reader.ReadBoolean();
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static int ReadInt32(ref MessagePackReader reader, string field)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -878,7 +878,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
}
|
||||
}
|
||||
|
||||
private async Task<(bool close, Exception exception)> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState, ChannelWriter<InvocationMessage> invocationMessageWriter)
|
||||
private async Task<CloseMessage> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState, ChannelWriter<InvocationMessage> invocationMessageWriter)
|
||||
{
|
||||
Log.ResettingKeepAliveTimer(_logger);
|
||||
connectionState.ResetTimeout();
|
||||
|
|
@ -911,7 +911,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
if (!connectionState.TryGetInvocation(streamItem.InvocationId, out irq))
|
||||
{
|
||||
Log.DroppedStreamMessage(_logger, streamItem.InvocationId);
|
||||
return (close: false, exception: null);
|
||||
break;
|
||||
}
|
||||
await DispatchInvocationStreamItemAsync(streamItem, irq);
|
||||
break;
|
||||
|
|
@ -919,13 +919,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
if (string.IsNullOrEmpty(close.Error))
|
||||
{
|
||||
Log.ReceivedClose(_logger);
|
||||
return (close: true, exception: null);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.ReceivedCloseWithError(_logger, close.Error);
|
||||
return (close: true, exception: new HubException($"The server closed the connection with the following error: {close.Error}"));
|
||||
}
|
||||
return close;
|
||||
case PingMessage _:
|
||||
Log.ReceivedPing(_logger);
|
||||
// timeout is reset above, on receiving any message
|
||||
|
|
@ -934,7 +933,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
|
||||
}
|
||||
|
||||
return (close: false, exception: null);
|
||||
return null;
|
||||
}
|
||||
|
||||
private async Task DispatchInvocationAsync(InvocationMessage invocation)
|
||||
|
|
@ -1150,25 +1149,33 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
{
|
||||
Log.ProcessingMessage(_logger, buffer.Length);
|
||||
|
||||
var close = false;
|
||||
CloseMessage closeMessage = null;
|
||||
|
||||
while (_protocol.TryParseMessage(ref buffer, connectionState, out var message))
|
||||
{
|
||||
Exception exception;
|
||||
|
||||
// We have data, process it
|
||||
(close, exception) = await ProcessMessagesAsync(message, connectionState, invocationMessageChannel.Writer);
|
||||
if (close)
|
||||
closeMessage = await ProcessMessagesAsync(message, connectionState, invocationMessageChannel.Writer);
|
||||
|
||||
if (closeMessage != null)
|
||||
{
|
||||
// Closing because we got a close frame, possibly with an error in it.
|
||||
connectionState.CloseException = exception;
|
||||
connectionState.Stopping = true;
|
||||
if (closeMessage.Error != null)
|
||||
{
|
||||
connectionState.CloseException = new HubException($"The server closed the connection with the following error: {closeMessage.Error}");
|
||||
}
|
||||
|
||||
// Stopping being true indicates the client shouldn't try to reconnect even if automatic reconnects are enabled.
|
||||
if (!closeMessage.AllowReconnect)
|
||||
{
|
||||
connectionState.Stopping = true;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If we're closing stop everything
|
||||
if (close)
|
||||
if (closeMessage != null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
|
@ -1637,6 +1644,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
public Exception CloseException { get; set; }
|
||||
public CancellationToken UploadStreamToken { get; set; }
|
||||
|
||||
// Indicates the connection is stopping AND the client should NOT attempt to reconnect even if automatic reconnects are enabled.
|
||||
// This means either HubConnection.DisposeAsync/StopAsync was called OR a CloseMessage with AllowReconnects set to false was received.
|
||||
public bool Stopping
|
||||
{
|
||||
get => _stopping;
|
||||
|
|
|
|||
|
|
@ -368,6 +368,156 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CanBeInducedByCloseMessageWithAllowReconnectSet()
|
||||
{
|
||||
bool ExpectedErrors(WriteContext writeContext)
|
||||
{
|
||||
return writeContext.LoggerName == typeof(HubConnection).FullName &&
|
||||
(writeContext.EventId.Name == "ReceivedCloseWithError" ||
|
||||
writeContext.EventId.Name == "ReconnectingWithError");
|
||||
}
|
||||
|
||||
var failReconnectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
using (StartVerifiableLog(ExpectedErrors))
|
||||
{
|
||||
var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
|
||||
var testConnectionFactory = default(ReconnectingConnectionFactory);
|
||||
|
||||
testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
|
||||
builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
|
||||
|
||||
var retryContexts = new List<RetryContext>();
|
||||
var mockReconnectPolicy = new Mock<IRetryPolicy>();
|
||||
mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
|
||||
{
|
||||
retryContexts.Add(context);
|
||||
return TimeSpan.Zero;
|
||||
});
|
||||
builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
|
||||
|
||||
await using var hubConnection = builder.Build();
|
||||
var reconnectingCount = 0;
|
||||
var reconnectedCount = 0;
|
||||
var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
hubConnection.Reconnecting += error =>
|
||||
{
|
||||
reconnectingCount++;
|
||||
reconnectingErrorTcs.SetResult(error);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
hubConnection.Reconnected += connectionId =>
|
||||
{
|
||||
reconnectedCount++;
|
||||
reconnectedConnectionIdTcs.SetResult(connectionId);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
hubConnection.Closed += error =>
|
||||
{
|
||||
closedErrorTcs.SetResult(error);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await hubConnection.StartAsync().OrTimeout();
|
||||
|
||||
var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
|
||||
await currentConnection.ReceiveJsonMessage(new
|
||||
{
|
||||
type = HubProtocolConstants.CloseMessageType,
|
||||
error = "Error!",
|
||||
allowReconnect = true,
|
||||
});
|
||||
|
||||
var reconnectingException = await reconnectingErrorTcs.Task.OrTimeout();
|
||||
var expectedMessage = "The server closed the connection with the following error: Error!";
|
||||
|
||||
Assert.Equal(expectedMessage, reconnectingException.Message);
|
||||
Assert.Single(retryContexts);
|
||||
Assert.Equal(expectedMessage, retryContexts[0].RetryReason.Message);
|
||||
Assert.Equal(0, retryContexts[0].PreviousRetryCount);
|
||||
Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
|
||||
|
||||
await reconnectedConnectionIdTcs.Task.OrTimeout();
|
||||
|
||||
await hubConnection.StopAsync().OrTimeout();
|
||||
|
||||
var closeError = await closedErrorTcs.Task.OrTimeout();
|
||||
Assert.Null(closeError);
|
||||
Assert.Equal(1, reconnectingCount);
|
||||
Assert.Equal(1, reconnectedCount);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CannotBeInducedByCloseMessageWithAllowReconnectOmitted()
|
||||
{
|
||||
bool ExpectedErrors(WriteContext writeContext)
|
||||
{
|
||||
return writeContext.LoggerName == typeof(HubConnection).FullName &&
|
||||
(writeContext.EventId.Name == "ReceivedCloseWithError" ||
|
||||
writeContext.EventId.Name == "ShutdownWithError");
|
||||
}
|
||||
|
||||
var failReconnectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
using (StartVerifiableLog(ExpectedErrors))
|
||||
{
|
||||
var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
|
||||
var testConnectionFactory = default(ReconnectingConnectionFactory);
|
||||
|
||||
testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
|
||||
builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
|
||||
|
||||
var reconnectingCount = 0;
|
||||
var nextRetryDelayCallCount = 0;
|
||||
var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var mockReconnectPolicy = new Mock<IRetryPolicy>();
|
||||
mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
|
||||
{
|
||||
nextRetryDelayCallCount++;
|
||||
return TimeSpan.Zero;
|
||||
});
|
||||
|
||||
builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
|
||||
|
||||
await using var hubConnection = builder.Build();
|
||||
|
||||
hubConnection.Reconnecting += error =>
|
||||
{
|
||||
reconnectingCount++;
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
hubConnection.Closed += error =>
|
||||
{
|
||||
closedErrorTcs.SetResult(error);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await hubConnection.StartAsync().OrTimeout();
|
||||
|
||||
var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
|
||||
await currentConnection.ReceiveJsonMessage(new
|
||||
{
|
||||
type = HubProtocolConstants.CloseMessageType,
|
||||
error = "Error!",
|
||||
});
|
||||
|
||||
var closeError = await closedErrorTcs.Task.OrTimeout();
|
||||
|
||||
Assert.Equal("The server closed the connection with the following error: Error!", closeError.Message);
|
||||
Assert.Equal(0, nextRetryDelayCallCount);
|
||||
Assert.Equal(0, reconnectingCount);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task EventsNotFiredIfFirstRetryDelayIsNull()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -124,6 +124,7 @@ export class MessagePackHubProtocol implements IHubProtocol {
|
|||
|
||||
return {
|
||||
// Close messages have no headers.
|
||||
allowReconnect: properties.length >= 3 ? properties[2] : undefined,
|
||||
error: properties[1],
|
||||
type: MessageType.Close,
|
||||
} as HubMessage;
|
||||
|
|
|
|||
|
|
@ -545,8 +545,18 @@ export class HubConnection {
|
|||
case MessageType.Close:
|
||||
this.logger.log(LogLevel.Information, "Close message received from server.");
|
||||
|
||||
// We don't want to wait on the stop itself.
|
||||
this.stopPromise = this.stopInternal(message.error ? new Error("Server returned an error on close: " + message.error) : undefined);
|
||||
const error = message.error ? new Error("Server returned an error on close: " + message.error) : undefined;
|
||||
|
||||
if (message.allowReconnect === true) {
|
||||
// It feels wrong not to await connection.stop() here, but processIncomingData is called as part of an onreceive callback which is not async,
|
||||
// this is already the behavior for serverTimeout(), and HttpConnection.Stop() should catch and log all possible exceptions.
|
||||
|
||||
// tslint:disable-next-line:no-floating-promises
|
||||
this.connection.stop(error);
|
||||
} else {
|
||||
// We cannot await stopInternal() here, but subsequent calls to stop() will await this if stopInternal() is still ongoing.
|
||||
this.stopPromise = this.stopInternal(error);
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -131,6 +131,9 @@ export interface CloseMessage extends HubMessageBase {
|
|||
* If this property is undefined, the connection was closed normally and without error.
|
||||
*/
|
||||
readonly error?: string;
|
||||
|
||||
/** If true, clients with automatic reconnects enabled should attempt to reconnect after receiving the CloseMessage. Otherwise, they should not. */
|
||||
readonly allowReconnect?: boolean;
|
||||
}
|
||||
|
||||
/** A hub message sent to request that a streaming invocation be canceled. */
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy";
|
||||
import { HubConnection, HubConnectionState } from "../src/HubConnection";
|
||||
import { MessageType } from "../src/IHubProtocol";
|
||||
import { RetryContext } from "../src/IRetryPolicy";
|
||||
import { JsonHubProtocol } from "../src/JsonHubProtocol";
|
||||
|
||||
|
|
@ -728,4 +729,60 @@ describe("auto reconnect", () => {
|
|||
expect(closeCount).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("reconnect on close message if allowReconnect is true and auto reconnect is enabled", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
const connection = new TestConnection();
|
||||
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy());
|
||||
try {
|
||||
let isReconnecting = false;
|
||||
let reconnectingError: Error | undefined;
|
||||
|
||||
hubConnection.onreconnecting((e) => {
|
||||
isReconnecting = true;
|
||||
reconnectingError = e;
|
||||
});
|
||||
|
||||
await hubConnection.start();
|
||||
|
||||
connection.receive({
|
||||
allowReconnect: true,
|
||||
error: "Error!",
|
||||
type: MessageType.Close,
|
||||
});
|
||||
|
||||
expect(isReconnecting).toEqual(true);
|
||||
expect(reconnectingError!.message).toEqual("Server returned an error on close: Error!");
|
||||
} finally {
|
||||
await hubConnection.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("stop on close message if allowReconnect is missing and auto reconnect is enabled", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
const connection = new TestConnection();
|
||||
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy());
|
||||
try {
|
||||
let isClosed = false;
|
||||
let closeError: Error | undefined;
|
||||
hubConnection.onclose((e) => {
|
||||
isClosed = true;
|
||||
closeError = e;
|
||||
});
|
||||
|
||||
await hubConnection.start();
|
||||
|
||||
connection.receive({
|
||||
error: "Error!",
|
||||
type: MessageType.Close,
|
||||
});
|
||||
|
||||
expect(isClosed).toEqual(true);
|
||||
expect(closeError!.message).toEqual("Server returned an error on close: Error!");
|
||||
} finally {
|
||||
await hubConnection.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -822,7 +822,9 @@ describe("HubConnection", () => {
|
|||
|
||||
await hubConnection.start();
|
||||
|
||||
// allowReconnect Should have no effect since auto reconnect is disabled by default.
|
||||
connection.receive({
|
||||
allowReconnect: true,
|
||||
error: "Error!",
|
||||
type: MessageType.Close,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
private static JsonEncodedText TypePropertyNameBytes = JsonEncodedText.Encode(TypePropertyName);
|
||||
private const string ErrorPropertyName = "error";
|
||||
private static JsonEncodedText ErrorPropertyNameBytes = JsonEncodedText.Encode(ErrorPropertyName);
|
||||
private const string AllowReconnectPropertyName = "allowReconnect";
|
||||
private static JsonEncodedText AllowReconnectPropertyNameBytes = JsonEncodedText.Encode(AllowReconnectPropertyName);
|
||||
private const string TargetPropertyName = "target";
|
||||
private static JsonEncodedText TargetPropertyNameBytes = JsonEncodedText.Encode(TargetPropertyName);
|
||||
private const string ArgumentsPropertyName = "arguments";
|
||||
|
|
@ -132,6 +134,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
ExceptionDispatchInfo argumentBindingException = null;
|
||||
Dictionary<string, string> headers = null;
|
||||
var completed = false;
|
||||
var allowReconnect = false;
|
||||
|
||||
var reader = new Utf8JsonReader(input, isFinalBlock: true, state: default);
|
||||
|
||||
|
|
@ -186,6 +189,10 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
{
|
||||
error = reader.ReadAsString(ErrorPropertyName);
|
||||
}
|
||||
else if (reader.ValueTextEquals(AllowReconnectPropertyNameBytes.EncodedUtf8Bytes))
|
||||
{
|
||||
allowReconnect = reader.ReadAsBoolean(AllowReconnectPropertyName);
|
||||
}
|
||||
else if (reader.ValueTextEquals(ResultPropertyNameBytes.EncodedUtf8Bytes))
|
||||
{
|
||||
hasResult = true;
|
||||
|
|
@ -372,7 +379,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
case HubProtocolConstants.PingMessageType:
|
||||
return PingMessage.Instance;
|
||||
case HubProtocolConstants.CloseMessageType:
|
||||
return BindCloseMessage(error);
|
||||
return BindCloseMessage(error, allowReconnect);
|
||||
case null:
|
||||
throw new InvalidDataException($"Missing required property '{TypePropertyName}'.");
|
||||
default:
|
||||
|
|
@ -544,6 +551,11 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
{
|
||||
writer.WriteString(ErrorPropertyNameBytes, message.Error);
|
||||
}
|
||||
|
||||
if (message.AllowReconnect)
|
||||
{
|
||||
writer.WriteBoolean(AllowReconnectPropertyNameBytes, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void WriteArguments(object[] arguments, Utf8JsonWriter writer)
|
||||
|
|
@ -722,16 +734,15 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
return arguments ?? Array.Empty<object>();
|
||||
}
|
||||
|
||||
private CloseMessage BindCloseMessage(string error)
|
||||
private CloseMessage BindCloseMessage(string error, bool allowReconnect)
|
||||
{
|
||||
// An empty string is still an error
|
||||
if (error == null)
|
||||
if (error == null && !allowReconnect)
|
||||
{
|
||||
return CloseMessage.Empty;
|
||||
}
|
||||
|
||||
var message = new CloseMessage(error);
|
||||
return message;
|
||||
return new CloseMessage(error, allowReconnect);
|
||||
}
|
||||
|
||||
private HubMessage ApplyHeaders(HubMessage message, Dictionary<string, string> headers)
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
case HubProtocolConstants.PingMessageType:
|
||||
return PingMessage.Instance;
|
||||
case HubProtocolConstants.CloseMessageType:
|
||||
return CreateCloseMessage(input, ref startOffset);
|
||||
return CreateCloseMessage(input, ref startOffset, itemCount);
|
||||
default:
|
||||
// Future protocol changes can add message types, old clients can ignore them
|
||||
return null;
|
||||
|
|
@ -261,10 +261,23 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
return ApplyHeaders(headers, new CancelInvocationMessage(invocationId));
|
||||
}
|
||||
|
||||
private static CloseMessage CreateCloseMessage(byte[] input, ref int offset)
|
||||
private static CloseMessage CreateCloseMessage(byte[] input, ref int offset, int itemCount)
|
||||
{
|
||||
var error = ReadString(input, ref offset, "error");
|
||||
return new CloseMessage(error);
|
||||
var allowReconnect = false;
|
||||
|
||||
if (itemCount > 2)
|
||||
{
|
||||
allowReconnect = ReadBoolean(input, ref offset, "allowReconnect");
|
||||
}
|
||||
|
||||
// An empty string is still an error
|
||||
if (error == null && !allowReconnect)
|
||||
{
|
||||
return CloseMessage.Empty;
|
||||
}
|
||||
|
||||
return new CloseMessage(error, allowReconnect);
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> ReadHeaders(byte[] input, ref int offset)
|
||||
|
|
@ -533,7 +546,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
|
||||
private void WriteCloseMessage(CloseMessage message, Stream packer)
|
||||
{
|
||||
MessagePackBinary.WriteArrayHeader(packer, 2);
|
||||
MessagePackBinary.WriteArrayHeader(packer, 3);
|
||||
MessagePackBinary.WriteInt16(packer, HubProtocolConstants.CloseMessageType);
|
||||
if (string.IsNullOrEmpty(message.Error))
|
||||
{
|
||||
|
|
@ -543,6 +556,8 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
{
|
||||
MessagePackBinary.WriteString(packer, message.Error);
|
||||
}
|
||||
|
||||
MessagePackBinary.WriteBoolean(packer, message.AllowReconnect);
|
||||
}
|
||||
|
||||
private void WritePingMessage(PingMessage pingMessage, Stream packer)
|
||||
|
|
@ -576,6 +591,23 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
return ReadString(input, ref offset, "invocationId");
|
||||
}
|
||||
|
||||
private static bool ReadBoolean(byte[] input, ref int offset, string field)
|
||||
{
|
||||
Exception msgPackException = null;
|
||||
try
|
||||
{
|
||||
var readBool = MessagePackBinary.ReadBoolean(input, offset, out var readSize);
|
||||
offset += readSize;
|
||||
return readBool;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
msgPackException = e;
|
||||
}
|
||||
|
||||
throw new InvalidDataException($"Reading '{field}' as Boolean failed.", msgPackException);
|
||||
}
|
||||
|
||||
private static int ReadInt32(byte[] input, ref int offset, string field)
|
||||
{
|
||||
Exception msgPackException = null;
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
private const string TargetPropertyName = "target";
|
||||
private const string ArgumentsPropertyName = "arguments";
|
||||
private const string HeadersPropertyName = "headers";
|
||||
private const string AllowReconnectPropertyName = "allowReconnect";
|
||||
|
||||
private static readonly string ProtocolName = "json";
|
||||
private static readonly int ProtocolVersion = 1;
|
||||
|
|
@ -131,6 +132,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
ExceptionDispatchInfo argumentBindingException = null;
|
||||
Dictionary<string, string> headers = null;
|
||||
var completed = false;
|
||||
var allowReconnect = false;
|
||||
|
||||
using (var reader = JsonUtils.CreateJsonTextReader(textReader))
|
||||
{
|
||||
|
|
@ -187,6 +189,9 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
case ErrorPropertyName:
|
||||
error = JsonUtils.ReadAsString(reader, ErrorPropertyName);
|
||||
break;
|
||||
case AllowReconnectPropertyName:
|
||||
allowReconnect = JsonUtils.ReadAsBoolean(reader, AllowReconnectPropertyName);
|
||||
break;
|
||||
case ResultPropertyName:
|
||||
hasResult = true;
|
||||
|
||||
|
|
@ -373,7 +378,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
case HubProtocolConstants.PingMessageType:
|
||||
return PingMessage.Instance;
|
||||
case HubProtocolConstants.CloseMessageType:
|
||||
return BindCloseMessage(error);
|
||||
return BindCloseMessage(error, allowReconnect);
|
||||
case null:
|
||||
throw new InvalidDataException($"Missing required property '{TypePropertyName}'.");
|
||||
default:
|
||||
|
|
@ -550,6 +555,12 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
writer.WritePropertyName(ErrorPropertyName);
|
||||
writer.WriteValue(message.Error);
|
||||
}
|
||||
|
||||
if (message.AllowReconnect)
|
||||
{
|
||||
writer.WritePropertyName(AllowReconnectPropertyName);
|
||||
writer.WriteValue(true);
|
||||
}
|
||||
}
|
||||
|
||||
private void WriteArguments(object[] arguments, JsonTextWriter writer)
|
||||
|
|
@ -733,16 +744,15 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
throw new JsonReaderException("Unexpected end when reading JSON");
|
||||
}
|
||||
|
||||
private CloseMessage BindCloseMessage(string error)
|
||||
private CloseMessage BindCloseMessage(string error, bool allowReconnect)
|
||||
{
|
||||
// An empty string is still an error
|
||||
if (error == null)
|
||||
if (error == null && !allowReconnect)
|
||||
{
|
||||
return CloseMessage.Empty;
|
||||
}
|
||||
|
||||
var message = new CloseMessage(error);
|
||||
return message;
|
||||
return new CloseMessage(error, allowReconnect);
|
||||
}
|
||||
|
||||
private object[] BindArguments(JArray args, IReadOnlyList<Type> paramTypes)
|
||||
|
|
|
|||
|
|
@ -114,6 +114,18 @@ namespace Microsoft.AspNetCore.Internal
|
|||
}
|
||||
}
|
||||
|
||||
public static bool ReadAsBoolean(JsonTextReader reader, string propertyName)
|
||||
{
|
||||
reader.Read();
|
||||
|
||||
if (reader.TokenType != JsonToken.Boolean || reader.Value == null)
|
||||
{
|
||||
throw new InvalidDataException($"Expected '{propertyName}' to be of type {JTokenType.Boolean}.");
|
||||
}
|
||||
|
||||
return Convert.ToBoolean(reader.Value, CultureInfo.InvariantCulture);
|
||||
}
|
||||
|
||||
public static int? ReadAsInt32(JsonTextReader reader, string propertyName)
|
||||
{
|
||||
reader.Read();
|
||||
|
|
|
|||
|
|
@ -57,6 +57,18 @@ namespace Microsoft.AspNetCore.Internal
|
|||
}
|
||||
}
|
||||
|
||||
public static bool ReadAsBoolean(this ref Utf8JsonReader reader, string propertyName)
|
||||
{
|
||||
reader.Read();
|
||||
|
||||
return reader.TokenType switch
|
||||
{
|
||||
JsonTokenType.False => false,
|
||||
JsonTokenType.True => true,
|
||||
_ => throw new InvalidDataException($"Expected '{propertyName}' to be true or false."),
|
||||
};
|
||||
}
|
||||
|
||||
public static string ReadAsString(this ref Utf8JsonReader reader, string propertyName)
|
||||
{
|
||||
reader.Read();
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
{
|
||||
public static readonly Microsoft.AspNetCore.SignalR.Protocol.CloseMessage Empty;
|
||||
public CloseMessage(string error) { }
|
||||
public CloseMessage(string error, bool allowReconnect) { }
|
||||
public bool AllowReconnect { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
|
||||
public string Error { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
|
||||
}
|
||||
public partial class CompletionMessage : Microsoft.AspNetCore.SignalR.Protocol.HubInvocationMessage
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
{
|
||||
public static readonly Microsoft.AspNetCore.SignalR.Protocol.CloseMessage Empty;
|
||||
public CloseMessage(string error) { }
|
||||
public CloseMessage(string error, bool allowReconnect) { }
|
||||
public bool AllowReconnect { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
|
||||
public string Error { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
|
||||
}
|
||||
public partial class CompletionMessage : Microsoft.AspNetCore.SignalR.Protocol.HubInvocationMessage
|
||||
|
|
|
|||
|
|
@ -12,9 +12,9 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
public class CloseMessage : HubMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// An empty close message with no error.
|
||||
/// An empty close message with no error and <see cref="AllowReconnect"/> set to <see langword="false"/>.
|
||||
/// </summary>
|
||||
public static readonly CloseMessage Empty = new CloseMessage(null);
|
||||
public static readonly CloseMessage Empty = new CloseMessage(error: null, allowReconnect: false);
|
||||
|
||||
/// <summary>
|
||||
/// Gets the optional error message.
|
||||
|
|
@ -22,12 +22,32 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
|
|||
public string Error { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="CloseMessage"/> class with an optional error message.
|
||||
/// If <see langword="false"/>, clients with automatic reconnects enabled should not attempt to automatically reconnect after receiving the <see cref="CloseMessage"/>.
|
||||
/// </summary>
|
||||
public bool AllowReconnect { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="CloseMessage"/> class with an optional error message and <see cref="AllowReconnect"/> set to <see langword="false"/>.
|
||||
/// </summary>
|
||||
/// <param name="error">An optional error message.</param>
|
||||
public CloseMessage(string error)
|
||||
: this(error, allowReconnect: false)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="CloseMessage"/> class with an optional error message and a <see cref="bool"/> indicating whether or not a client with
|
||||
/// automatic reconnects enabled should attempt to reconnect upon receiving the message.
|
||||
/// </summary>
|
||||
/// <param name="error">An optional error message.</param>
|
||||
/// <param name="allowReconnect">
|
||||
/// <see langword="true"/>, if client with automatic reconnects enabled should attempt to reconnect after receiving the <see cref="CloseMessage"/>;
|
||||
/// <see langword="false"/>, if the client should not try to reconnect whether or not automatic reconnects are enabled.
|
||||
/// </param>
|
||||
public CloseMessage(string error, bool allowReconnect)
|
||||
{
|
||||
Error = error;
|
||||
AllowReconnect = allowReconnect;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,6 +67,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
|
|||
new JsonProtocolTestData("CloseMessage_HasError", new CloseMessage("Error!"), false, true, "{\"type\":7,\"error\":\"Error!\"}"),
|
||||
new JsonProtocolTestData("CloseMessage_HasErrorEmptyString", new CloseMessage(""), false, true, "{\"type\":7,\"error\":\"\"}"),
|
||||
new JsonProtocolTestData("CloseMessage_HasErrorWithCamelCase", new CloseMessage("Error!"), true, true, "{\"type\":7,\"error\":\"Error!\"}"),
|
||||
new JsonProtocolTestData("CloseMessage_HasAllowReconnect", new CloseMessage(error: null, allowReconnect: true), true, true, "{\"type\":7,\"allowReconnect\":true}"),
|
||||
new JsonProtocolTestData("CloseMessage_HasErrorAndAllowReconnect", new CloseMessage("Error!", allowReconnect: true), true, true, "{\"type\":7,\"error\":\"Error!\",\"allowReconnect\":true}"),
|
||||
|
||||
}.ToDictionary(t => t.Name);
|
||||
|
||||
|
|
|
|||
|
|
@ -178,6 +178,24 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
|
|||
name: "Ping",
|
||||
message: PingMessage.Instance,
|
||||
binary: "kQY="),
|
||||
|
||||
// Close Messages
|
||||
new ProtocolTestData(
|
||||
name: "CloseMessage",
|
||||
message: CloseMessage.Empty,
|
||||
binary: "kwfAwg=="),
|
||||
new ProtocolTestData(
|
||||
name: "CloseMessage_HasError",
|
||||
message: new CloseMessage("Error!"),
|
||||
binary: "kwemRXJyb3Ihwg=="),
|
||||
new ProtocolTestData(
|
||||
name: "CloseMessage_HasAllowReconnect",
|
||||
message: new CloseMessage(error: null, allowReconnect: true),
|
||||
binary: "kwfAww=="),
|
||||
new ProtocolTestData(
|
||||
name: "CloseMessage_HasErrorAndAllowReconnect",
|
||||
message: new CloseMessage("Error!", allowReconnect: true),
|
||||
binary: "kwemRXJyb3Ihww=="),
|
||||
}.ToDictionary(t => t.Name);
|
||||
|
||||
[Theory]
|
||||
|
|
|
|||
|
|
@ -482,6 +482,7 @@ A `Close` message is a JSON object with the following properties
|
|||
|
||||
* `type` - A `Number` with the literal value `7`, indicating that this message is a `Close`.
|
||||
* `error` - An optional `String` encoding the error message.
|
||||
* `allowReconnect` - An optional `Boolean` indicating to clients with automatic reconnects enabled that they should attempt to reconnect after receiving the message.
|
||||
|
||||
Example - A `Close` message without an error
|
||||
```json
|
||||
|
|
@ -498,6 +499,15 @@ Example - A `Close` message with an error
|
|||
}
|
||||
```
|
||||
|
||||
Example - A `Close` message with an error that allows automatic client reconnects.
|
||||
```json
|
||||
{
|
||||
"type": 7,
|
||||
"error": "Connection closed because of an error!",
|
||||
"allowReconnect": true
|
||||
}
|
||||
```
|
||||
|
||||
### JSON Header Encoding
|
||||
|
||||
Message headers are encoded into a JSON object, with string values, that are stored in the `headers` property. For example:
|
||||
|
|
@ -809,11 +819,12 @@ is decoded as follows:
|
|||
`Close` messages have the following structure
|
||||
|
||||
```
|
||||
[7, Error]
|
||||
[7, Error, AllowReconnect?]
|
||||
```
|
||||
|
||||
* `7` - Message Type - `7` indicates this is a `Close` message.
|
||||
* `Error` - Error - A `String` encoding the error for the message.
|
||||
* `AllowReconnect` - An optional `Boolean` indicating to clients with automatic reconnects enabled that they should attempt to reconnect after receiving the message.
|
||||
|
||||
Examples:
|
||||
|
||||
|
|
@ -833,6 +844,23 @@ is decoded as follows:
|
|||
* `0x79` - `y`
|
||||
* `0x7a` - `z`
|
||||
|
||||
#### Close message that allows automatic client reconnects
|
||||
|
||||
The following payload:
|
||||
```
|
||||
0x93 0x07 0xa3 0x78 0x79 0x7a 0xc3
|
||||
```
|
||||
|
||||
is decoded as follows:
|
||||
|
||||
* `0x93` - 3-element array
|
||||
* `0x07` - `7` (Message Type - `Close` message)
|
||||
* `0xa3` - string of length 3 (Error)
|
||||
* `0x78` - `x`
|
||||
* `0x79` - `y`
|
||||
* `0x7a` - `z`
|
||||
* `0xc3` - `True` (AllowReconnect)
|
||||
|
||||
### MessagePack Headers Encoding
|
||||
|
||||
Headers are encoded in MessagePack messages as a Map that immediately follows the type value. The Map can be empty, in which case it is represented by the byte `0x80`. If there are items in the map,
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
private ReadOnlyMemory<byte> _cachedPingMessage;
|
||||
private bool _clientTimeoutActive;
|
||||
private bool _connectionAborted;
|
||||
private volatile bool _allowReconnect = true;
|
||||
private int _streamBufferCapacity;
|
||||
private long? _maxMessageSize;
|
||||
|
||||
|
|
@ -106,6 +107,9 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
/// </summary>
|
||||
public virtual IDictionary<object, object> Items => _connectionContext.Items;
|
||||
|
||||
// Used by HubConnectionHandler to determine whether to set CloseMessage.AllowReconnect.
|
||||
internal bool AllowReconnect => _allowReconnect;
|
||||
|
||||
// Used by HubConnectionHandler
|
||||
internal PipeReader Input => _connectionContext.Transport.Input;
|
||||
|
||||
|
|
@ -201,7 +205,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
|
||||
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
|
||||
}
|
||||
|
|
@ -220,7 +224,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
|
||||
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
|
||||
}
|
||||
|
|
@ -236,7 +240,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -262,7 +266,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -287,7 +291,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -323,7 +327,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
catch (Exception ex)
|
||||
{
|
||||
Log.FailedWritingMessage(_logger, ex);
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -358,6 +362,12 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
/// Aborts the connection.
|
||||
/// </summary>
|
||||
public virtual void Abort()
|
||||
{
|
||||
_allowReconnect = false;
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
|
||||
private void AbortAllowReconnect()
|
||||
{
|
||||
_connectionAborted = true;
|
||||
|
||||
|
|
@ -514,7 +524,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
// Used by the HubConnectionHandler only
|
||||
internal Task AbortAsync()
|
||||
{
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
return _abortCompletedTcs.Task;
|
||||
}
|
||||
|
||||
|
|
@ -560,7 +570,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
if (!_receivedMessageThisInterval)
|
||||
{
|
||||
Log.ClientTimeout(_logger, TimeSpan.FromTicks(_clientTimeoutInterval));
|
||||
Abort();
|
||||
AbortAllowReconnect();
|
||||
}
|
||||
|
||||
_receivedMessageThisInterval = false;
|
||||
|
|
|
|||
|
|
@ -126,7 +126,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
{
|
||||
Log.ErrorDispatchingHubEvent(_logger, "OnConnectedAsync", ex);
|
||||
|
||||
await SendCloseAsync(connection, ex);
|
||||
// The client shouldn't try to reconnect given an error in OnConnected.
|
||||
await SendCloseAsync(connection, ex, allowReconnect: false);
|
||||
|
||||
// return instead of throw to let close message send successfully
|
||||
return;
|
||||
|
|
@ -157,7 +158,7 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
private async Task HubOnDisconnectedAsync(HubConnectionContext connection, Exception exception)
|
||||
{
|
||||
// send close message before aborting the connection
|
||||
await SendCloseAsync(connection, exception);
|
||||
await SendCloseAsync(connection, exception, connection.AllowReconnect);
|
||||
|
||||
// We wait on abort to complete, this is so that we can guarantee that all callbacks have fired
|
||||
// before OnDisconnectedAsync
|
||||
|
|
@ -176,14 +177,18 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
}
|
||||
}
|
||||
|
||||
private async Task SendCloseAsync(HubConnectionContext connection, Exception exception)
|
||||
private async Task SendCloseAsync(HubConnectionContext connection, Exception exception, bool allowReconnect)
|
||||
{
|
||||
var closeMessage = CloseMessage.Empty;
|
||||
|
||||
if (exception != null)
|
||||
{
|
||||
var errorMessage = ErrorMessageHelper.BuildErrorMessage("Connection closed with an error.", exception, _enableDetailedErrors);
|
||||
closeMessage = new CloseMessage(errorMessage);
|
||||
closeMessage = new CloseMessage(errorMessage, allowReconnect);
|
||||
}
|
||||
else if (allowReconnect)
|
||||
{
|
||||
closeMessage = new CloseMessage(error: null, allowReconnect);
|
||||
}
|
||||
|
||||
try
|
||||
|
|
|
|||
Loading…
Reference in New Issue