diff --git a/samples/ClientSample/HubSample.cs b/samples/ClientSample/HubSample.cs index 5a127314dc..a5ffe86191 100644 --- a/samples/ClientSample/HubSample.cs +++ b/samples/ClientSample/HubSample.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.IO; using System.Linq; using System.Net; using System.Threading; @@ -34,7 +35,6 @@ namespace ClientSample .WithLogging(logging => { logging.AddConsole(); - logging.SetMinimumLevel(LogLevel.Trace); }); if (uri.Scheme == "net.tcp") @@ -48,59 +48,83 @@ namespace ClientSample var connection = connectionBuilder.Build(); - try + Console.CancelKeyPress += (sender, a) => { - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.SetResult(null); - // Set up handler - connection.On("Send", Console.WriteLine); + a.Cancel = true; + connection.DisposeAsync().GetAwaiter().GetResult(); + }; - await ConnectAsync(connection); + // Set up handler + connection.On("Send", Console.WriteLine); - Console.WriteLine("Connected to {0}", uri); + CancellationTokenSource closedTokenSource = null; - var sendCts = new CancellationTokenSource(); + connection.Closed += e => + { + // This should never be null by the time this fires + closedTokenSource.Cancel(); - Console.CancelKeyPress += async (sender, a) => + Console.WriteLine("Connection closed..."); + }; + + while (true) + { + // Dispose the previous token + closedTokenSource?.Dispose(); + + // Create a new token for this run + closedTokenSource = new CancellationTokenSource(); + + // Connect to the server + if (!await ConnectAsync(connection)) { - a.Cancel = true; - Console.WriteLine("Stopping loops..."); - sendCts.Cancel(); - await connection.DisposeAsync(); - }; + break; + } - while (!closeTcs.Task.IsCompleted) + Console.WriteLine("Connected to {0}", uri); ; + + // Handle the connected connection + while (true) { - var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), closeTcs.Task); - if (completedTask == closeTcs.Task) + try { + var line = Console.ReadLine(); + + if (line == null || closedTokenSource.Token.IsCancellationRequested) + { + break; + } + + await connection.InvokeAsync("Send", line); + } + catch (IOException) + { + // Process being shutdown break; } - - var line = await (Task)completedTask; - - if (line == null) + catch (OperationCanceledException) { + // The connection closed + break; + } + catch (ObjectDisposedException) + { + // We're shutting down the client + break; + } + catch (Exception ex) + { + // Send could have failed because the connection closed + System.Console.WriteLine(ex); break; } - - await connection.InvokeAsync("Send", line, sendCts.Token); } } - catch (AggregateException aex) when (aex.InnerExceptions.All(e => e is OperationCanceledException)) - { - } - catch (OperationCanceledException) - { - } - finally - { - await connection.DisposeAsync(); - } + return 0; } - private static async Task ConnectAsync(HubConnection connection) + private static async Task ConnectAsync(HubConnection connection) { // Keep trying to until we can start while (true) @@ -108,11 +132,18 @@ namespace ClientSample try { await connection.StartAsync(); - return; + return true; + } + catch (ObjectDisposedException) + { + // Client side killed the connection + return false; } catch (Exception) { - await Task.Delay(1000); + Console.WriteLine("Failed to connect, trying again in 5000(ms)"); + + await Task.Delay(5000); } } } diff --git a/samples/ClientSample/Tcp/TcpConnection.cs b/samples/ClientSample/Tcp/TcpConnection.cs index d2656c1f80..c59e73a0ad 100644 --- a/samples/ClientSample/Tcp/TcpConnection.cs +++ b/samples/ClientSample/Tcp/TcpConnection.cs @@ -4,13 +4,15 @@ using System.IO; using System.IO.Pipelines; using System.Net; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Http.Features; namespace ClientSample { - public class TcpConnection : ConnectionContext + public class TcpConnection : ConnectionContext, IConnectionInherentKeepAliveFeature { private readonly Socket _socket; private volatile bool _aborted; @@ -26,6 +28,10 @@ namespace ClientSample _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); + + // Add IConnectionInherentKeepAliveFeature to the tcp connection impl since Kestrel doesn't implement + // the IConnectionHeartbeatFeature + Features.Set(this); } public override IDuplexPipe Transport { get; set; } @@ -34,6 +40,8 @@ namespace ClientSample public override string ConnectionId { get; set; } = Guid.NewGuid().ToString(); public override IDictionary Items { get; set; } = new ConnectionItems(); + public TimeSpan KeepAliveInterval => Timeout.InfiniteTimeSpan; + public Task DisposeAsync() { Transport?.Output.Complete();