Client pings server on interval (#2309)

This commit is contained in:
Dylan Dmitri Gray 2018-05-23 13:53:23 -07:00 committed by GitHub
parent 380a153405
commit 736b7f5042
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 276 additions and 71 deletions

View File

@ -10,6 +10,10 @@ import { BinaryMessageFormat } from "./BinaryMessageFormat";
// TypeDoc's @inheritDoc and @link don't work across modules :(
// constant encoding of the ping message
// see: https://github.com/aspnet/SignalR/blob/dev/specs/HubProtocol.md#ping-message-encoding-1
const SERIALIZED_PING_MESSAGE: ArrayBuffer = Uint8Array.from([0x91, MessageType.Ping]).buffer;
/** Implements the MessagePack Hub Protocol */
export class MessagePackHubProtocol implements IHubProtocol {
/** The name of the protocol. This is used by SignalR to resolve the protocol between the client and server. */
@ -50,6 +54,8 @@ export class MessagePackHubProtocol implements IHubProtocol {
case MessageType.StreamItem:
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
case MessageType.Ping:
return SERIALIZED_PING_MESSAGE;
default:
throw new Error("Invalid message type.");
}

View File

@ -6,9 +6,11 @@ import { IConnection } from "./IConnection";
import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
import { ILogger, LogLevel } from "./ILogger";
import { IStreamResult } from "./Stream";
import { TextMessageFormat } from "./TextMessageFormat";
import { Arg, Subject } from "./Utils";
const DEFAULT_TIMEOUT_IN_MS: number = 30 * 1000;
const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000;
/** Describes the current state of the {@link HubConnection} to the server. */
export enum HubConnectionState {
@ -20,6 +22,7 @@ export enum HubConnectionState {
/** Represents a connection to a SignalR Hub. */
export class HubConnection {
private readonly cachedPingMessage: string | ArrayBuffer;
private readonly connection: IConnection;
private readonly logger: ILogger;
private protocol: IHubProtocol;
@ -29,6 +32,7 @@ export class HubConnection {
private id: number;
private closedCallbacks: Array<(error?: Error) => void>;
private timeoutHandle: NodeJS.Timer;
private pingServerHandle: NodeJS.Timer;
private receivedHandshakeResponse: boolean;
private connectionState: HubConnectionState;
@ -39,6 +43,13 @@ export class HubConnection {
*/
public serverTimeoutInMilliseconds: number;
/** Default interval at which to ping the server.
*
* The default value is 15,000 milliseconds (15 seconds).
* Allows the server to detect hard disconnects (like when a client unplugs their computer).
*/
public pingIntervalInMilliseconds: number;
/** @internal */
// Using a public static factory method means we can have a private constructor and an _internal_
// create method that can be used by HubConnectionBuilder. An "internal" constructor would just
@ -54,6 +65,7 @@ export class HubConnection {
Arg.isRequired(protocol, "protocol");
this.serverTimeoutInMilliseconds = DEFAULT_TIMEOUT_IN_MS;
this.pingIntervalInMilliseconds = DEFAULT_PING_INTERVAL_IN_MS;
this.logger = logger;
this.protocol = protocol;
@ -68,6 +80,8 @@ export class HubConnection {
this.closedCallbacks = [];
this.id = 0;
this.connectionState = HubConnectionState.Disconnected;
this.cachedPingMessage = this.protocol.writeMessage({ type: MessageType.Ping });
}
/** Indicates the state of the {@link HubConnection} to the server. */
@ -93,13 +107,14 @@ export class HubConnection {
this.logger.log(LogLevel.Debug, "Sending handshake request.");
await this.connection.send(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
// defensively cleanup timeout in case we receive a message from the server before we finish start
this.cleanupTimeout();
this.configureTimeout();
this.resetTimeoutPeriod();
this.resetPingInterval();
this.connectionState = HubConnectionState.Connected;
}
@ -112,6 +127,7 @@ export class HubConnection {
this.logger.log(LogLevel.Debug, "Stopping HubConnection.");
this.cleanupTimeout();
this.cleanupPingTimer();
return this.connection.stop();
}
@ -131,7 +147,7 @@ export class HubConnection {
delete this.callbacks[invocationDescriptor.invocationId];
return this.connection.send(cancelMessage);
return this.sendMessage(cancelMessage);
});
this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage, error?: Error) => {
@ -153,7 +169,7 @@ export class HubConnection {
const message = this.protocol.writeMessage(invocationDescriptor);
this.connection.send(message)
this.sendMessage(message)
.catch((e) => {
subject.error(e);
delete this.callbacks[invocationDescriptor.invocationId];
@ -162,6 +178,11 @@ export class HubConnection {
return subject;
}
private sendMessage(message: any) {
this.resetPingInterval();
return this.connection.send(message);
}
/** Invokes a hub method on the server using the specified name and arguments. Does not wait for a response from the receiver.
*
* The Promise returned by this method resolves when the client has sent the invocation to the server. The server may still
@ -176,7 +197,7 @@ export class HubConnection {
const message = this.protocol.writeMessage(invocationDescriptor);
return this.connection.send(message);
return this.sendMessage(message);
}
/** Invokes a hub method on the server using the specified name and arguments.
@ -213,7 +234,7 @@ export class HubConnection {
const message = this.protocol.writeMessage(invocationDescriptor);
this.connection.send(message)
this.sendMessage(message)
.catch((e) => {
reject(e);
delete this.callbacks[invocationDescriptor.invocationId];
@ -337,7 +358,7 @@ export class HubConnection {
}
}
this.configureTimeout();
this.resetTimeoutPeriod();
}
private processHandshakeResponse(data: any): any {
@ -365,7 +386,12 @@ export class HubConnection {
return remainingData;
}
private configureTimeout() {
private resetPingInterval() {
this.cleanupPingTimer();
this.pingServerHandle = setTimeout(() => this.sendMessage(this.cachedPingMessage), this.pingIntervalInMilliseconds);
}
private resetTimeoutPeriod() {
if (!this.connection.features || !this.connection.features.inherentKeepAlive) {
// Set the timeout timer
this.timeoutHandle = setTimeout(() => this.serverTimeout(), this.serverTimeoutInMilliseconds);
@ -406,10 +432,17 @@ export class HubConnection {
});
this.cleanupTimeout();
this.cleanupPingTimer();
this.closedCallbacks.forEach((c) => c.apply(this, [error]));
}
private cleanupPingTimer(): void {
if (this.pingServerHandle) {
clearTimeout(this.pingServerHandle);
}
}
private cleanupTimeout(): void {
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);

View File

@ -267,7 +267,7 @@ describe("HttpConnection", () => {
});
}
it(`cannot be started if server's only transport (${HttpTransportType[requestedTransport]}) is masked out by the transport option`, async() => {
it(`cannot be started if server's only transport (${HttpTransportType[requestedTransport]}) is masked out by the transport option`, async () => {
const negotiateResponse = {
availableTransports: [
{ transport: "WebSockets", transferFormats: [ "Text", "Binary" ] },

View File

@ -48,6 +48,25 @@ describe("HubConnection", () => {
});
});
describe("ping", () => {
it("automatically sends multiple pings", async () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection);
hubConnection.pingIntervalInMilliseconds = 5;
try {
await hubConnection.start();
await delay(32);
const numPings = connection.sentData.filter((s) => JSON.parse(s).type === MessageType.Ping).length;
expect(numPings).toBeGreaterThanOrEqual(2);
} finally {
await hubConnection.stop();
}
});
});
describe("stop", () => {
it("state disconnected", async () => {
const connection = new TestConnection();
@ -870,7 +889,7 @@ describe("HubConnection", () => {
hubConnection.onclose((e) => state = hubConnection.state);
// Typically this would be called by the transport
connection.onclose();
expect(state).toBe(HubConnectionState.Disconnected);
} finally {
hubConnection.stop();

View File

@ -201,7 +201,8 @@ class TestProtocol implements IHubProtocol {
throw new Error("Method not implemented.");
}
public writeMessage(message: HubMessage): string | ArrayBuffer {
throw new Error("Method not implemented.");
// builds ping message in the `hubConnection` constructor
return "";
}
}

View File

@ -6,7 +6,7 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Http.Connections.Internal
namespace Microsoft.AspNetCore.Internal
{
internal class TimerAwaitable : IDisposable, ICriticalNotifyCompletion
{

View File

@ -13,6 +13,7 @@ using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Internal;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;

View File

@ -9,6 +9,7 @@
<Compile Include="..\Common\AwaitableThreadPool.cs" Link="AwaitableThreadPool.cs" />
<Compile Include="..\Common\MemoryBufferWriter.cs" Link="MemoryBufferWriter.cs" />
<Compile Include="..\Common\PipeWriterStream.cs" Link="PipeWriterStream.cs" />
<Compile Include="..\Common\TimerAwaitable.cs" Link="Internal\TimerAwaitable.cs" />
<Compile Include="..\Common\WebSocketExtensions.cs" Link="WebSocketExtensions.cs" />
<Compile Include="..\Common\StreamExtensions.cs" Link="StreamExtensions.cs" />
<Compile Include="..\Common\DuplexPipe.cs" Link="DuplexPipe.cs" />

View File

@ -174,6 +174,18 @@ namespace Microsoft.AspNetCore.SignalR.Client
private static readonly Action<ILogger, string, Exception> _removingHandlers =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(58, "RemovingHandlers"), "Removing handlers for client method '{MethodName}'.");
private static readonly Action<ILogger, string, Exception> _sendingMessageGeneric =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(59, "SendingMessageGeneric"), "Sending {MessageType} message.");
private static readonly Action<ILogger, string, Exception> _messageSentGeneric =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(60, "MessageSentGeneric"), "Sending {MessageType} message completed.");
private static readonly Action<ILogger, Exception> _acquiredConnectionLockForPing =
LoggerMessage.Define(LogLevel.Trace, new EventId(61, "AcquiredConnectionLockForPing"), "Acquired the Connection Lock in order to ping the server.");
private static readonly Action<ILogger, Exception> _unableToAcquireConnectionLockForPing =
LoggerMessage.Define(LogLevel.Trace, new EventId(62, "UnableToAcquireConnectionLockForPing"), "Skipping ping because a send is already in progress.");
public static void PreparingNonBlockingInvocation(ILogger logger, string target, int count)
{
_preparingNonBlockingInvocation(logger, target, count, null);
@ -203,19 +215,33 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
public static void SendingMessage(ILogger logger, HubInvocationMessage message)
public static void SendingMessage(ILogger logger, HubMessage message)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_sendingMessage(logger, message.GetType().Name, message.InvocationId, null);
if (message is HubInvocationMessage invocationMessage)
{
_sendingMessage(logger, message.GetType().Name, invocationMessage.InvocationId, null);
}
else
{
_sendingMessageGeneric(logger, message.GetType().Name, null);
}
}
}
public static void MessageSent(ILogger logger, HubInvocationMessage message)
public static void MessageSent(ILogger logger, HubMessage message)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_messageSent(logger, message.GetType().Name, message.InvocationId, null);
if (message is HubInvocationMessage invocationMessage)
{
_messageSent(logger, message.GetType().Name, invocationMessage.InvocationId, null);
}
else
{
_messageSentGeneric(logger, message.GetType().Name, null);
}
}
}
@ -460,6 +486,16 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
_argumentBindingFailure(logger, invocationId, target, exception);
}
public static void AcquiredConnectionLockForPing(ILogger logger)
{
_acquiredConnectionLockForPing(logger, null);
}
public static void UnableToAcquireConnectionLockForPing(ILogger logger)
{
_unableToAcquireConnectionLockForPing(logger, null);
}
}
}
}

View File

@ -33,6 +33,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30); // Server ping rate is 15 sec, this is 2 times that.
public static readonly TimeSpan DefaultHandshakeTimeout = TimeSpan.FromSeconds(15);
public static readonly TimeSpan DefaultPingInterval = TimeSpan.FromSeconds(15);
public static readonly TimeSpan DefaultTickRate = TimeSpan.FromSeconds(1);
// This lock protects the connection state.
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
@ -44,6 +46,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
private readonly IServiceProvider _serviceProvider;
private readonly IConnectionFactory _connectionFactory;
private readonly ConcurrentDictionary<string, InvocationHandlerList> _handlers = new ConcurrentDictionary<string, InvocationHandlerList>(StringComparer.Ordinal);
private long _nextActivationServerTimeout;
private long _nextActivationSendPing;
private bool _disposed;
// Transient state to a connection
@ -51,11 +55,28 @@ namespace Microsoft.AspNetCore.SignalR.Client
public event Func<Exception, Task> Closed;
// internal for testing purposes
internal TimeSpan TickRate { get; set; } = DefaultTickRate;
/// <summary>
/// Gets or sets the server timeout interval for the connection. Changes to this value
/// will not be applied until the Keep Alive timer is next reset.
/// Gets or sets the server timeout interval for the connection.
/// </summary>
/// <remarks>
/// The client times out if it hasn't heard from the server for `this` long.
/// </remarks>
public TimeSpan ServerTimeout { get; set; } = DefaultServerTimeout;
/// <summary>
/// Gets or sets the interval at which the client sends ping messages.
/// </summary>
/// <remarks>
/// Sending any message resets the timer to the start of the interval.
/// </remarks>
public TimeSpan PingInterval { get; set; } = DefaultPingInterval;
/// <summary>
/// Gets or sets the timeout for the initial handshake.
/// </summary>
public TimeSpan HandshakeTimeout { get; set; } = DefaultHandshakeTimeout;
/// <summary>
@ -163,7 +184,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// It's OK to be disposed while registering a callback, we'll just never call the callback anyway (as with all the callbacks registered before disposal).
var invocationHandler = new InvocationHandler(parameterTypes, handler, state);
var invocationList = _handlers.AddOrUpdate(methodName, _ => new InvocationHandlerList(invocationHandler) ,
var invocationList = _handlers.AddOrUpdate(methodName, _ => new InvocationHandlerList(invocationHandler),
(_, invocations) =>
{
lock (invocations)
@ -175,6 +196,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new Subscription(invocationHandler, invocationList);
}
/// <summary>
/// Removes all handlers associated with the method with the specified method name.
/// </summary>
@ -274,6 +296,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Set this at the end to avoid setting internal state until the connection is real
_connectionState = startingConnectionState;
_connectionState.ReceiveTask = ReceiveLoop(_connectionState);
Log.Started(_logger);
}
finally
@ -465,7 +488,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private async Task SendHubMessage(HubInvocationMessage hubMessage, CancellationToken cancellationToken = default)
private async Task SendHubMessage(HubMessage hubMessage, CancellationToken cancellationToken = default)
{
AssertConnectionValid();
@ -476,6 +499,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
// REVIEW: If a token is passed in and is canceled during FlushAsync it seems to break .Complete()...
await _connectionState.Connection.Transport.Output.FlushAsync();
// We've sent a message, so don't ping for a while
ResetSendPing();
Log.MessageSent(_logger, hubMessage);
}
@ -703,7 +729,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.ReceiveLoopStarting(_logger);
var timeoutTimer = StartTimeoutTimer(connectionState);
// Performs periodic tasks -- here sending pings and checking timeout
// Disposed with `timer.Stop()` in the finally block below
var timer = new TimerAwaitable(TickRate, TickRate);
_ = TimerLoop(timer);
try
{
@ -721,7 +750,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
else if (!buffer.IsEmpty)
{
ResetTimeoutTimer(timeoutTimer);
Log.ResettingKeepAliveTimer(_logger);
ResetTimeout();
Log.ProcessingMessage(_logger, buffer.Length);
@ -771,6 +801,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.ServerDisconnectedWithError(_logger, ex);
connectionState.CloseException = ex;
}
finally
{
timer.Stop();
}
// Clear the connectionState field
await WaitConnectionLockAsync();
@ -785,9 +819,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
ReleaseConnectionLock();
}
// Stop the timeout timer.
timeoutTimer?.Dispose();
// Dispose the connection
await CloseAsync(connectionState.Connection);
@ -814,6 +845,77 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
public void ResetSendPing()
{
Volatile.Write(ref _nextActivationSendPing, (DateTime.UtcNow + PingInterval).Ticks);
}
public void ResetTimeout()
{
Volatile.Write(ref _nextActivationServerTimeout, (DateTime.UtcNow + ServerTimeout).Ticks);
}
private async Task TimerLoop(TimerAwaitable timer)
{
// initialize the timers
timer.Start();
ResetSendPing();
ResetTimeout();
using (timer)
{
// await returns True until `timer.Stop()` is called in the `finally` block of `ReceiveLoop`
while (await timer)
{
if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationServerTimeout))
{
OnServerTimeout();
}
if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationSendPing))
{
await PingServer();
}
}
}
}
private void OnServerTimeout()
{
if (Debugger.IsAttached)
{
return;
}
_connectionState.CloseException = new TimeoutException(
$"Server timeout ({ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server.");
_connectionState.Connection.Transport.Input.CancelPendingRead();
}
private async Task PingServer()
{
if (_disposed || !_connectionLock.Wait(0))
{
Log.UnableToAcquireConnectionLockForPing(_logger);
return;
}
Log.AcquiredConnectionLockForPing(_logger);
try
{
if (_disposed || _connectionState == null || _connectionState.Stopping)
{
return;
}
await SendHubMessage(PingMessage.Instance);
}
finally
{
ReleaseConnectionLock();
}
}
private async Task RunClosedEvent(Func<Exception, Task> closed, Exception closeException)
{
// Dispatch to the thread pool before we invoke the user callback
@ -830,48 +932,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private void ResetTimeoutTimer(Timer timeoutTimer)
{
if (timeoutTimer != null)
{
Log.ResettingKeepAliveTimer(_logger);
timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
}
}
private Timer StartTimeoutTimer(ConnectionState connectionState)
{
// Check if we need keep-alive
Timer timeoutTimer = null;
// We use '!== true' because it could be null, which we treat as false.
if (connectionState.Connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive != true)
{
Log.StartingServerTimeoutTimer(_logger, ServerTimeout);
timeoutTimer = new Timer(
state => OnTimeout((ConnectionState)state),
connectionState,
dueTime: ServerTimeout,
period: Timeout.InfiniteTimeSpan);
}
else
{
Log.NotUsingServerTimeout(_logger);
}
return timeoutTimer;
}
private void OnTimeout(ConnectionState connectionState)
{
if (!Debugger.IsAttached)
{
connectionState.CloseException = new TimeoutException(
$"Server timeout ({ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server.");
connectionState.Connection.Transport.Input.CancelPendingRead();
}
}
private void CheckConnectionActive(string methodName)
{
if (_connectionState == null || _connectionState.Stopping)

View File

@ -10,6 +10,7 @@
<Compile Include="..\Common\AwaitableThreadPool.cs" Link="AwaitableThreadPool.cs" />
<Compile Include="..\Common\ForceAsyncAwaiter.cs" Link="ForceAsyncAwaiter.cs" />
<Compile Include="..\Common\PipeWriterStream.cs" Link="PipeWriterStream.cs" />
<Compile Include="..\Common\TimerAwaitable.cs" Link="Internal\TimerAwaitable.cs" />
</ItemGroup>
<ItemGroup>

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using Xunit;
@ -532,7 +533,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
public async Task PartialInvocationWorks()
{
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection);
try
@ -565,6 +566,32 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task ClientPingsMultipleTimes()
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection);
hubConnection.TickRate = TimeSpan.FromMilliseconds(30);
hubConnection.PingInterval = TimeSpan.FromMilliseconds(80);
try
{
await hubConnection.StartAsync().OrTimeout();
var firstPing = await connection.ReadSentTextMessageAsync().OrTimeout(TimeSpan.FromMilliseconds(200));
Assert.Equal("{\"type\":6}", firstPing);
var secondPing = await connection.ReadSentTextMessageAsync().OrTimeout(TimeSpan.FromMilliseconds(200));
Assert.Equal("{\"type\":6}", secondPing);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
}
}
}

View File

@ -24,7 +24,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private readonly TaskCompletionSource<object> _disposed = new TaskCompletionSource<object>();
private int _disposeCount = 0;
public Task Started => _started.Task;
public Task Disposed => _disposed.Task;
@ -117,7 +116,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
return Application.Output.WriteAsync(bytes).AsTask();
}
public async Task<string> ReadSentTextMessageAsync()
{
// Read a single text message from the Application Input pipe
@ -136,7 +134,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
else if (result.IsCompleted)
{
throw new InvalidOperationException("Out of data!");
return null;
}
}
finally
@ -146,6 +144,28 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
public async Task<IList<string>> ReadAllSentMessagesAsync()
{
if (!Disposed.IsCompleted)
{
throw new InvalidOperationException("The connection must be stopped before this method can be used.");
}
var results = new List<string>();
while (true)
{
var message = await ReadSentTextMessageAsync();
if (message == null)
{
break;
}
results.Add(message);
}
return results;
}
public void CompleteFromTransport(Exception ex = null)
{
Application.Output.Complete(ex);