parent
15c3bca8e6
commit
a8330067c4
|
|
@ -24,6 +24,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
{
|
||||
public class HubConnection
|
||||
{
|
||||
public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30); // Server ping rate is 15 sec, this is 2 times that.
|
||||
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger _logger;
|
||||
private readonly IConnection _connection;
|
||||
|
|
@ -38,9 +40,17 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
|
||||
private int _nextId = 0;
|
||||
private volatile bool _startCalled;
|
||||
private Timer _timeoutTimer;
|
||||
private bool _needKeepAlive;
|
||||
|
||||
public Task Closed { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the server timeout interval for the connection. Changes to this value
|
||||
/// will not be applied until the Keep Alive timer is next reset.
|
||||
/// </summary>
|
||||
public TimeSpan ServerTimeout { get; set; } = DefaultServerTimeout;
|
||||
|
||||
public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFactory loggerFactory)
|
||||
{
|
||||
if (connection == null)
|
||||
|
|
@ -64,6 +74,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
Shutdown(task.Exception);
|
||||
return task;
|
||||
}).Unwrap();
|
||||
|
||||
// Create the timer for timeout, but disabled by default (we enable it when started).
|
||||
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
|
||||
}
|
||||
|
||||
public async Task StartAsync()
|
||||
|
|
@ -78,6 +91,20 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
}
|
||||
}
|
||||
|
||||
private void TimeoutElapsed()
|
||||
{
|
||||
_connection.AbortAsync(new TimeoutException($"Server timeout ({ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server."));
|
||||
}
|
||||
|
||||
private void ResetTimeoutTimer()
|
||||
{
|
||||
if (_needKeepAlive)
|
||||
{
|
||||
_logger.ResettingKeepAliveTimer();
|
||||
_timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StartAsyncCore()
|
||||
{
|
||||
var transferModeFeature = _connection.Features.Get<ITransferModeFeature>();
|
||||
|
|
@ -94,6 +121,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
|
||||
transferModeFeature.TransferMode = requestedTransferMode;
|
||||
await _connection.StartAsync();
|
||||
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;
|
||||
var actualTransferMode = transferModeFeature.TransferMode;
|
||||
|
||||
_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
|
||||
|
|
@ -105,6 +133,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
|
||||
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token);
|
||||
}
|
||||
|
||||
ResetTimeoutTimer();
|
||||
}
|
||||
|
||||
private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, TransferMode actualTransferMode)
|
||||
|
|
@ -125,6 +155,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
|
||||
private async Task DisposeAsyncCore()
|
||||
{
|
||||
_timeoutTimer.Dispose();
|
||||
await _connection.DisposeAsync();
|
||||
await Closed;
|
||||
}
|
||||
|
|
@ -298,6 +329,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
|
|||
|
||||
private async Task OnDataReceivedAsync(byte[] data)
|
||||
{
|
||||
ResetTimeoutTimer();
|
||||
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))
|
||||
{
|
||||
foreach (var message in messages)
|
||||
|
|
|
|||
|
|
@ -85,6 +85,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Internal
|
|||
private static readonly Action<ILogger, string, string, string, int, Exception> _preparingStreamingInvocation =
|
||||
LoggerMessage.Define<string, string, string, int>(LogLevel.Trace, new EventId(24, nameof(PreparingStreamingInvocation)), "Preparing streaming invocation '{invocationId}' of '{target}', with return type '{returnType}' and {argumentCount} argument(s).");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
|
||||
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");
|
||||
|
||||
// Category: Streaming and NonStreaming
|
||||
private static readonly Action<ILogger, string, Exception> _invocationCreated =
|
||||
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
|
||||
|
|
@ -282,7 +285,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Internal
|
|||
|
||||
public static void ErrorInvokingClientSideMethod(this ILogger logger, string methodName, Exception exception)
|
||||
{
|
||||
_errorInvokingClientSideMethod(logger, methodName, exception);
|
||||
_errorInvokingClientSideMethod(logger, methodName, exception);
|
||||
}
|
||||
|
||||
public static void ResettingKeepAliveTimer(this ILogger logger)
|
||||
{
|
||||
_resettingKeepAliveTimer(logger, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
<PropertyGroup>
|
||||
<Description>Client for ASP.NET Core SignalR</Description>
|
||||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<RootNamespace>Microsoft.AspNetCore.SignalR.Client</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
||||
{
|
||||
internal static class HubProtocolConstants
|
||||
public static class HubProtocolConstants
|
||||
{
|
||||
public const int InvocationMessageType = 1;
|
||||
public const int StreamItemMessageType = 2;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
|
|
@ -13,6 +13,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
Task StartAsync();
|
||||
Task SendAsync(byte[] data, CancellationToken cancellationToken);
|
||||
Task DisposeAsync();
|
||||
Task AbortAsync(Exception ex);
|
||||
|
||||
IDisposable OnReceived(Func<byte[], object, Task> callback, object state);
|
||||
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
_logger = _loggerFactory.CreateLogger<HttpConnection>();
|
||||
_httpOptions = httpOptions;
|
||||
_httpClient = _httpOptions?.HttpMessageHandler == null ? new HttpClient() : new HttpClient(_httpOptions?.HttpMessageHandler);
|
||||
_httpClient.Timeout = HttpClientTimeout;
|
||||
_httpClient.Timeout = HttpClientTimeout;
|
||||
_transportFactory = transportFactory ?? throw new ArgumentNullException(nameof(transportFactory));
|
||||
}
|
||||
|
||||
|
|
@ -303,7 +303,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
// Start the transport, giving it one end of the pipeline
|
||||
try
|
||||
{
|
||||
await _transport.StartAsync(connectUrl, applicationSide, requestedTransferMode: GetTransferMode(), connectionId: _connectionId);
|
||||
await _transport.StartAsync(connectUrl, applicationSide, GetTransferMode(), _connectionId, this);
|
||||
|
||||
// actual transfer mode can differ from the one that was requested so set it on the feature
|
||||
Debug.Assert(_transport.Mode.HasValue, "transfer mode not set after transport started");
|
||||
|
|
@ -435,11 +435,25 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
}
|
||||
}
|
||||
|
||||
public async Task AbortAsync(Exception ex) => await DisposeAsyncCore(ex ?? new InvalidOperationException("Connection aborted")).ForceAsync();
|
||||
|
||||
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
|
||||
|
||||
private async Task DisposeAsyncCore()
|
||||
private async Task DisposeAsyncCore(Exception ex = null)
|
||||
{
|
||||
_logger.StoppingClient(_connectionId);
|
||||
if (ex != null)
|
||||
{
|
||||
_logger.AbortingClient(_connectionId, ex);
|
||||
|
||||
// Immediately fault the close task. When the transport shuts down,
|
||||
// it will trigger the close task to be completed, so we want it to be
|
||||
// marked faulted before that happens
|
||||
_closedTcs.TrySetException(ex);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.StoppingClient(_connectionId);
|
||||
}
|
||||
|
||||
if (Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected) == ConnectionState.Initial)
|
||||
{
|
||||
|
|
@ -472,6 +486,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
await _receiveLoopTask;
|
||||
}
|
||||
|
||||
// If we haven't already done so, trigger the Closed task.
|
||||
_closedTcs.TrySetResult(null);
|
||||
_httpClient?.Dispose();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
{
|
||||
public interface ITransport
|
||||
{
|
||||
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId);
|
||||
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection);
|
||||
Task StopAsync();
|
||||
TransferMode? Mode { get; }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,6 +153,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
|
|||
private static readonly Action<ILogger, DateTime, string, string, Exception> _exceptionThrownFromCallback =
|
||||
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, new EventId(19, nameof(ExceptionThrownFromCallback)), "{time}: Connection Id {connectionId}: An exception was thrown from the '{callback}' callback");
|
||||
|
||||
private static readonly Action<ILogger, DateTime, string, Exception> _abortingClient =
|
||||
LoggerMessage.Define<DateTime, string>(LogLevel.Error, new EventId(20, nameof(AbortingClient)), "{time}: Connection Id {connectionId}: Aborting client.");
|
||||
|
||||
|
||||
public static void StartTransport(this ILogger logger, string connectionId, TransferMode transferMode)
|
||||
{
|
||||
|
|
@ -506,6 +509,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
|
|||
}
|
||||
}
|
||||
|
||||
public static void AbortingClient(this ILogger logger, string connectionId, Exception ex)
|
||||
{
|
||||
if (logger.IsEnabled(LogLevel.Error))
|
||||
{
|
||||
_abortingClient(logger, DateTime.Now, connectionId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static void StoppingClient(this ILogger logger, string connectionId)
|
||||
{
|
||||
if (logger.IsEnabled(LogLevel.Information))
|
||||
|
|
|
|||
|
|
@ -2,14 +2,14 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Sockets.Client.Http;
|
||||
using Microsoft.AspNetCore.Sockets.Client.Internal;
|
||||
using Microsoft.AspNetCore.Sockets.Features;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
|
||||
|
|
@ -42,13 +42,15 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
|
||||
}
|
||||
|
||||
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
|
||||
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
|
||||
{
|
||||
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
|
||||
{
|
||||
throw new ArgumentException("Invalid transfer mode.", nameof(requestedTransferMode));
|
||||
}
|
||||
|
||||
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout));
|
||||
|
||||
_application = application;
|
||||
Mode = requestedTransferMode;
|
||||
_connectionId = connectionId;
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
|
||||
}
|
||||
|
||||
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
|
||||
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
|
||||
{
|
||||
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketsTransport>();
|
||||
}
|
||||
|
||||
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
|
||||
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
|
||||
{
|
||||
if (url == null)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
releaseDisposeTcs.SetResult(null);
|
||||
await disposeTask.OrTimeout();
|
||||
|
||||
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()), Times.Never);
|
||||
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()), Times.Never);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
@ -263,8 +263,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
|
||||
var mockTransport = new Mock<ITransport>();
|
||||
Channel<byte[], SendMessage> channel = null;
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
|
||||
{
|
||||
channel = c;
|
||||
return Task.CompletedTask;
|
||||
|
|
@ -311,8 +311,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
|
||||
var mockTransport = new Mock<ITransport>();
|
||||
Channel<byte[], SendMessage> channel = null;
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
|
||||
{
|
||||
channel = c;
|
||||
return Task.CompletedTask;
|
||||
|
|
@ -368,8 +368,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
|
||||
var mockTransport = new Mock<ITransport>();
|
||||
Channel<byte[], SendMessage> channel = null;
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
|
||||
{
|
||||
channel = c;
|
||||
return Task.CompletedTask;
|
||||
|
|
@ -413,8 +413,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
|
||||
var mockTransport = new Mock<ITransport>();
|
||||
Channel<byte[], SendMessage> channel = null;
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
|
||||
{
|
||||
channel = c;
|
||||
return Task.CompletedTask;
|
||||
|
|
@ -925,8 +925,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
|
||||
var mockTransport = new Mock<ITransport>();
|
||||
Channel<byte[], SendMessage> channel = null;
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
|
||||
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
|
||||
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
|
||||
{
|
||||
channel = c;
|
||||
return Task.CompletedTask;
|
||||
|
|
@ -947,7 +947,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
await connection.DisposeAsync().OrTimeout();
|
||||
|
||||
mockTransport.Verify(t => t.StartAsync(
|
||||
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>()), Times.Once);
|
||||
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>(), It.IsAny<IConnection>()), Times.Once);
|
||||
Assert.NotNull(transferModeFeature);
|
||||
Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ using Microsoft.AspNetCore.Http.Features;
|
|||
using Microsoft.AspNetCore.SignalR.Internal;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
|
||||
using Microsoft.AspNetCore.Sockets.Client;
|
||||
using Microsoft.AspNetCore.Sockets.Features;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
|
@ -33,6 +34,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
public async Task DisposeAsyncCallsConnectionStart()
|
||||
{
|
||||
var connection = new Mock<IConnection>();
|
||||
connection.Setup(m => m.Features).Returns(new FeatureCollection());
|
||||
connection.Setup(m => m.StartAsync()).Verifiable();
|
||||
var hubConnection = new HubConnection(connection.Object, Mock.Of<IHubProtocol>(), null);
|
||||
await hubConnection.DisposeAsync();
|
||||
|
|
@ -185,6 +187,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
await Assert.ThrowsAsync<InvalidOperationException>(async () => await invokeTask);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages()
|
||||
{
|
||||
var connection = new TestConnection();
|
||||
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
|
||||
|
||||
hubConnection.ServerTimeout = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
await hubConnection.StartAsync().OrTimeout();
|
||||
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await hubConnection.Closed.OrTimeout());
|
||||
Assert.Equal("Server timeout (100.00ms) elapsed without receiving a message from the server.", ex.Message);
|
||||
}
|
||||
|
||||
// Moq really doesn't handle out parameters well, so to make these tests work I added a manual mock -anurse
|
||||
private class MockHubProtocol : IHubProtocol
|
||||
{
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ using System.Text;
|
|||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.SignalR.Client.Tests;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
using Microsoft.AspNetCore.Sockets.Client;
|
||||
using Microsoft.AspNetCore.Sockets.Internal;
|
||||
|
|
@ -43,7 +44,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
transportActiveTask = longPollingTransport.Running;
|
||||
|
||||
|
|
@ -79,7 +80,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = ChannelConnection.Create(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
await longPollingTransport.Running.OrTimeout();
|
||||
Assert.True(transportToConnection.Reader.Completion.IsCompleted);
|
||||
|
|
@ -132,7 +133,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
var data = await transportToConnection.Reader.ReadAllAsync().OrTimeout();
|
||||
await longPollingTransport.Running.OrTimeout();
|
||||
|
|
@ -168,7 +169,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
var exception =
|
||||
await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion.OrTimeout());
|
||||
|
|
@ -204,7 +205,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
await connectionToTransport.Writer.WriteAsync(new SendMessage());
|
||||
|
||||
|
|
@ -245,7 +246,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
connectionToTransport.Writer.Complete();
|
||||
|
||||
|
|
@ -296,7 +297,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
|
||||
// Start the transport
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
// Wait for the transport to finish
|
||||
await longPollingTransport.Running.OrTimeout();
|
||||
|
|
@ -361,7 +362,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
|
||||
|
||||
// Start the transport
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
connectionToTransport.Writer.Complete();
|
||||
|
||||
|
|
@ -404,7 +405,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
|
||||
Assert.Null(longPollingTransport.Mode);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, transferMode, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, transferMode, connectionId: string.Empty, connection: new TestConnection());
|
||||
Assert.Equal(transferMode, longPollingTransport.Mode);
|
||||
}
|
||||
finally
|
||||
|
|
@ -430,7 +431,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
{
|
||||
var longPollingTransport = new LongPollingTransport(httpClient);
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
|
||||
longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), null, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty));
|
||||
longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), null, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection()));
|
||||
|
||||
Assert.Contains("Invalid transfer mode.", exception.Message);
|
||||
Assert.Equal("requestedTransferMode", exception.ParamName);
|
||||
|
|
@ -468,7 +469,7 @@ namespace Microsoft.AspNetCore.Client.Tests
|
|||
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
|
||||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
|
||||
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty, connection: new TestConnection());
|
||||
|
||||
var completedTask = await Task.WhenAny(completionTcs.Task, longPollingTransport.Running).OrTimeout();
|
||||
Assert.Equal(completionTcs.Task, completedTask);
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
|
||||
await eventStreamTcs.Task.OrTimeout();
|
||||
await sseTransport.StopAsync().OrTimeout();
|
||||
|
|
@ -108,7 +108,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
|
||||
transportActiveTask = sseTransport.Running;
|
||||
Assert.False(transportActiveTask.IsCompleted);
|
||||
|
|
@ -156,7 +156,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
|
||||
var exception = await Assert.ThrowsAsync<FormatException>(() => sseTransport.Running.OrTimeout());
|
||||
Assert.Equal("Incomplete message.", exception.Message);
|
||||
|
|
@ -202,7 +202,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
await eventStreamTcs.Task;
|
||||
|
||||
var sendTcs = new TaskCompletionSource<object>();
|
||||
|
|
@ -249,7 +249,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
await eventStreamTcs.Task.OrTimeout();
|
||||
|
||||
connectionToTransport.Writer.TryComplete(null);
|
||||
|
|
@ -278,7 +278,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
await sseTransport.StartAsync(
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
|
||||
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
|
||||
var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
|
||||
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
|
||||
|
|
@ -308,7 +308,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
Assert.Null(sseTransport.Mode);
|
||||
await sseTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, transferMode, connectionId: string.Empty).OrTimeout();
|
||||
await sseTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, transferMode, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
Assert.Equal(TransferMode.Text, sseTransport.Mode);
|
||||
await sseTransport.StopAsync().OrTimeout();
|
||||
}
|
||||
|
|
@ -333,7 +333,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
var transportToConnection = Channel.CreateUnbounded<byte[]>();
|
||||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
|
||||
sseTransport.StartAsync(new Uri("http://fakeuri.org"), null, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty));
|
||||
sseTransport.StartAsync(new Uri("http://fakeuri.org"), null, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty, connection: Mock.Of<IConnection>()));
|
||||
|
||||
Assert.Contains("Invalid transfer mode.", exception.Message);
|
||||
Assert.Equal("requestedTransferMode", exception.ParamName);
|
||||
|
|
|
|||
|
|
@ -48,9 +48,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
_receiveLoop = ReceiveLoopAsync(_receiveShutdownToken.Token);
|
||||
}
|
||||
|
||||
public Task DisposeAsync()
|
||||
public Task AbortAsync(Exception ex) => DisposeCoreAsync(ex);
|
||||
public Task DisposeAsync() => DisposeCoreAsync();
|
||||
|
||||
private Task DisposeCoreAsync(Exception ex = null)
|
||||
{
|
||||
_disposed.TrySetResult(null);
|
||||
if (ex == null)
|
||||
{
|
||||
_closeTcs.TrySetResult(null);
|
||||
_disposed.TrySetResult(null);
|
||||
}
|
||||
else
|
||||
{
|
||||
_closeTcs.TrySetException(ex);
|
||||
_disposed.TrySetException(ex);
|
||||
}
|
||||
|
||||
_receiveShutdownToken.Cancel();
|
||||
return _receiveLoop;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ using Microsoft.AspNetCore.Sockets.Client;
|
|||
using Microsoft.AspNetCore.Sockets.Internal;
|
||||
using Microsoft.AspNetCore.Testing.xunit;
|
||||
using Microsoft.Extensions.Logging.Testing;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
|
|
@ -41,7 +42,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
|
||||
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
|
||||
TransferMode.Binary, connectionId: string.Empty).OrTimeout();
|
||||
TransferMode.Binary, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
await webSocketsTransport.StopAsync().OrTimeout();
|
||||
await webSocketsTransport.Running.OrTimeout();
|
||||
}
|
||||
|
|
@ -59,7 +60,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
|
||||
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
|
||||
TransferMode.Binary, connectionId: string.Empty);
|
||||
TransferMode.Binary, connectionId: string.Empty, connection: Mock.Of<IConnection>());
|
||||
connectionToTransport.Writer.TryComplete();
|
||||
await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
|
|
@ -78,7 +79,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
|
||||
|
||||
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
|
||||
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, transferMode, connectionId: string.Empty);
|
||||
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, transferMode, connectionId: string.Empty, connection: Mock.Of<IConnection>());
|
||||
|
||||
var sendTcs = new TaskCompletionSource<object>();
|
||||
connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
|
||||
|
|
@ -119,7 +120,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
Assert.Null(webSocketsTransport.Mode);
|
||||
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
|
||||
transferMode, connectionId: string.Empty).OrTimeout();
|
||||
transferMode, connectionId: string.Empty, connection: Mock.Of<IConnection>()).OrTimeout();
|
||||
Assert.Equal(transferMode, webSocketsTransport.Mode);
|
||||
|
||||
await webSocketsTransport.StopAsync().OrTimeout();
|
||||
|
|
@ -139,7 +140,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
var webSocketsTransport = new WebSocketsTransport(httpOptions: null, loggerFactory: loggerFactory);
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
|
||||
webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty));
|
||||
webSocketsTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text | TransferMode.Binary, connectionId: string.Empty, connection: Mock.Of<IConnection>()));
|
||||
|
||||
Assert.Contains("Invalid transfer mode.", exception.Message);
|
||||
Assert.Equal("requestedTransferMode", exception.ParamName);
|
||||
|
|
|
|||
Loading…
Reference in New Issue