From e9d58154ec0de3bfca1e7eebafb05bab1d151d0b Mon Sep 17 00:00:00 2001 From: David Fowler Date: Mon, 2 Apr 2018 09:50:50 -0700 Subject: [PATCH] Added TCP client sample to HubSample (#1805) - Ripped of Kestrel's SocketConnection to make a TcpConnection IConnection implementation. - Fixed issue with SignalR assuming there will always be a non-null user on the ConnectionContext. --- samples/ClientSample/ClientSample.csproj | 4 + samples/ClientSample/HubSample.cs | 12 +- samples/ClientSample/Tcp/BufferExtensions.cs | 24 ++ samples/ClientSample/Tcp/SocketAwaitable.cs | 71 +++++ samples/ClientSample/Tcp/SocketReceiver.cs | 40 +++ samples/ClientSample/Tcp/SocketSender.cs | 100 ++++++++ samples/ClientSample/Tcp/TcpConnection.cs | 242 ++++++++++++++++++ .../Tcp/TcpHubConnectionBuilderExtensions.cs | 15 ++ samples/SignalRSamples/Program.cs | 4 +- .../DefaultUserIdProvider.cs | 2 +- 10 files changed, 505 insertions(+), 9 deletions(-) create mode 100644 samples/ClientSample/Tcp/BufferExtensions.cs create mode 100644 samples/ClientSample/Tcp/SocketAwaitable.cs create mode 100644 samples/ClientSample/Tcp/SocketReceiver.cs create mode 100644 samples/ClientSample/Tcp/SocketSender.cs create mode 100644 samples/ClientSample/Tcp/TcpConnection.cs create mode 100644 samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs diff --git a/samples/ClientSample/ClientSample.csproj b/samples/ClientSample/ClientSample.csproj index f38b39eb13..966067f59b 100644 --- a/samples/ClientSample/ClientSample.csproj +++ b/samples/ClientSample/ClientSample.csproj @@ -7,6 +7,10 @@ Exe + + + + diff --git a/samples/ClientSample/HubSample.cs b/samples/ClientSample/HubSample.cs index 93def0cbb7..a480e65430 100644 --- a/samples/ClientSample/HubSample.cs +++ b/samples/ClientSample/HubSample.cs @@ -3,6 +3,7 @@ using System; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Client; @@ -27,12 +28,11 @@ namespace ClientSample public static async Task ExecuteAsync(string baseUrl) { - baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl; - - Console.WriteLine("Connecting to {0}", baseUrl); + var endpoint = new IPEndPoint(IPAddress.Loopback, 9001); + Console.WriteLine("Connecting to {0}", endpoint); var connection = new HubConnectionBuilder() - .WithUrl(baseUrl) - .WithConsoleLogger(LogLevel.Trace) + .WithEndPoint(endpoint) + .WithConsoleLogger(LogLevel.Information) .Build(); try @@ -44,7 +44,7 @@ namespace ClientSample await ConnectAsync(connection); - Console.WriteLine("Connected to {0}", baseUrl); + Console.WriteLine("Connected to {0}", endpoint); var sendCts = new CancellationTokenSource(); diff --git a/samples/ClientSample/Tcp/BufferExtensions.cs b/samples/ClientSample/Tcp/BufferExtensions.cs new file mode 100644 index 0000000000..9a74e15ba0 --- /dev/null +++ b/samples/ClientSample/Tcp/BufferExtensions.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Text; + +namespace System +{ + public static class BufferExtensions + { + public static ArraySegment GetArray(this Memory memory) + { + return ((ReadOnlyMemory)memory).GetArray(); + } + + public static ArraySegment GetArray(this ReadOnlyMemory memory) + { + if (!MemoryMarshal.TryGetArray(memory, out var result)) + { + throw new InvalidOperationException("Buffer backed by array was expected"); + } + return result; + } + } +} diff --git a/samples/ClientSample/Tcp/SocketAwaitable.cs b/samples/ClientSample/Tcp/SocketAwaitable.cs new file mode 100644 index 0000000000..d7b70425d6 --- /dev/null +++ b/samples/ClientSample/Tcp/SocketAwaitable.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ClientSample +{ + public class SocketAwaitable : ICriticalNotifyCompletion + { + private static readonly Action _callbackCompleted = () => { }; + + private readonly PipeScheduler _ioScheduler; + + private Action _callback; + private int _bytesTransferred; + private SocketError _error; + + public SocketAwaitable(PipeScheduler ioScheduler) + { + _ioScheduler = ioScheduler; + } + + public SocketAwaitable GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + + public int GetResult() + { + Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); + + _callback = null; + + if (_error != SocketError.Success) + { + throw new SocketException((int)_error); + } + + return _bytesTransferred; + } + + public void OnCompleted(Action continuation) + { + if (ReferenceEquals(_callback, _callbackCompleted) || + ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) + { + Task.Run(continuation); + } + } + + public void UnsafeOnCompleted(Action continuation) + { + OnCompleted(continuation); + } + + public void Complete(int bytesTransferred, SocketError socketError) + { + _error = socketError; + _bytesTransferred = bytesTransferred; + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + + if (continuation != null) + { + _ioScheduler.Schedule(state => ((Action)state)(), continuation); + } + } + } +} diff --git a/samples/ClientSample/Tcp/SocketReceiver.cs b/samples/ClientSample/Tcp/SocketReceiver.cs new file mode 100644 index 0000000000..8c6421f078 --- /dev/null +++ b/samples/ClientSample/Tcp/SocketReceiver.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Text; + +namespace ClientSample +{ + public class SocketReceiver + { + private readonly Socket _socket; + private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); + private readonly SocketAwaitable _awaitable; + + public SocketReceiver(Socket socket, PipeScheduler scheduler) + { + _socket = socket; + _awaitable = new SocketAwaitable(scheduler); + _eventArgs.UserToken = _awaitable; + _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); + } + + public SocketAwaitable ReceiveAsync(Memory buffer) + { +#if NETCOREAPP2_1 + _eventArgs.SetBuffer(buffer); +#else + var segment = buffer.GetArray(); + + _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); +#endif + if (!_socket.ReceiveAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + } +} diff --git a/samples/ClientSample/Tcp/SocketSender.cs b/samples/ClientSample/Tcp/SocketSender.cs new file mode 100644 index 0000000000..7fa1c20ac5 --- /dev/null +++ b/samples/ClientSample/Tcp/SocketSender.cs @@ -0,0 +1,100 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Text; + +namespace ClientSample +{ + public class SocketSender + { + private readonly Socket _socket; + private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); + private readonly SocketAwaitable _awaitable; + + private List> _bufferList; + + public SocketSender(Socket socket, PipeScheduler scheduler) + { + _socket = socket; + _awaitable = new SocketAwaitable(scheduler); + _eventArgs.UserToken = _awaitable; + _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); + } + + public SocketAwaitable SendAsync(ReadOnlySequence buffers) + { + if (buffers.IsSingleSegment) + { + return SendAsync(buffers.First); + } + +#if NETCOREAPP2_1 + if (!_eventArgs.MemoryBuffer.Equals(Memory.Empty)) +#else + if (_eventArgs.Buffer != null) +#endif + { + _eventArgs.SetBuffer(null, 0, 0); + } + + _eventArgs.BufferList = GetBufferList(buffers); + + if (!_socket.SendAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + + private SocketAwaitable SendAsync(ReadOnlyMemory memory) + { + // The BufferList getter is much less expensive then the setter. + if (_eventArgs.BufferList != null) + { + _eventArgs.BufferList = null; + } + +#if NETCOREAPP2_1 + _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); +#else + var segment = memory.GetArray(); + + _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); +#endif + if (!_socket.SendAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + + private List> GetBufferList(ReadOnlySequence buffer) + { + Debug.Assert(!buffer.IsEmpty); + Debug.Assert(!buffer.IsSingleSegment); + + if (_bufferList == null) + { + _bufferList = new List>(); + } + else + { + // Buffers are pooled, so it's OK to root them until the next multi-buffer write. + _bufferList.Clear(); + } + + foreach (var b in buffer) + { + _bufferList.Add(b.GetArray()); + } + + return _bufferList; + } + } +} diff --git a/samples/ClientSample/Tcp/TcpConnection.cs b/samples/ClientSample/Tcp/TcpConnection.cs new file mode 100644 index 0000000000..a053486a7e --- /dev/null +++ b/samples/ClientSample/Tcp/TcpConnection.cs @@ -0,0 +1,242 @@ +using System; +using System.IO; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Http.Features; + +namespace ClientSample +{ + public class TcpConnection : IConnection + { + private readonly Socket _socket; + private volatile bool _aborted; + private readonly EndPoint _endPoint; + private IDuplexPipe _transport; + private IDuplexPipe _application; + private SocketSender _sender; + private SocketReceiver _receiver; + + public TcpConnection(EndPoint endPoint) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _endPoint = endPoint; + + _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); + _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); + } + + public IDuplexPipe Transport => _transport; + + public IFeatureCollection Features { get; } = new FeatureCollection(); + + public Task DisposeAsync() + { + _transport?.Output.Complete(); + _transport?.Input.Complete(); + + _socket?.Dispose(); + + return Task.CompletedTask; + } + + public async Task StartAsync() + { + await _socket.ConnectAsync(_endPoint); + + var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); + + _transport = pair.Transport; + _application = pair.Application; + + _ = ExecuteAsync(); + } + + private async Task ExecuteAsync() + { + Exception sendError = null; + try + { + // Spawn send and receive logic + Task receiveTask = DoReceive(); + Task sendTask = DoSend(); + + // If the sending task completes then close the receive + // We don't need to do this in the other direction because the kestrel + // will trigger the output closing once the input is complete. + if (await Task.WhenAny(receiveTask, sendTask) == sendTask) + { + // Tell the reader it's being aborted + _socket.Dispose(); + } + + // Now wait for both to complete + await receiveTask; + sendError = await sendTask; + + // Dispose the socket(should noop if already called) + _socket.Dispose(); + } + catch (Exception ex) + { + Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex); + } + finally + { + // Complete the output after disposing the socket + _application.Input.Complete(sendError); + } + } + private async Task DoReceive() + { + Exception error = null; + + try + { + await ProcessReceives(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) + { + error = new ConnectionResetException(ex.Message, ex); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted || + ex.SocketErrorCode == SocketError.ConnectionAborted || + ex.SocketErrorCode == SocketError.Interrupted || + ex.SocketErrorCode == SocketError.InvalidArgument) + { + if (!_aborted) + { + // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix. + error = new ConnectionAbortedException(); + } + } + catch (ObjectDisposedException) + { + if (!_aborted) + { + error = new ConnectionAbortedException(); + } + } + catch (IOException ex) + { + error = ex; + } + catch (Exception ex) + { + error = new IOException(ex.Message, ex); + } + finally + { + if (_aborted) + { + error = error ?? new ConnectionAbortedException(); + } + + _application.Output.Complete(error); + } + } + + private async Task ProcessReceives() + { + while (true) + { + // Ensure we have some reasonable amount of buffer space + var buffer = _application.Output.GetMemory(); + + var bytesReceived = await _receiver.ReceiveAsync(buffer); + + if (bytesReceived == 0) + { + // FIN + break; + } + + _application.Output.Advance(bytesReceived); + + var flushTask = _application.Output.FlushAsync(); + + if (!flushTask.IsCompleted) + { + await flushTask; + } + + var result = flushTask.GetAwaiter().GetResult(); + if (result.IsCompleted) + { + // Pipe consumer is shut down, do we stop writing + break; + } + } + } + + private async Task DoSend() + { + Exception error = null; + + try + { + await ProcessSends(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = null; + } + catch (ObjectDisposedException) + { + error = null; + } + catch (IOException ex) + { + error = ex; + } + catch (Exception ex) + { + error = new IOException(ex.Message, ex); + } + finally + { + _aborted = true; + _socket.Shutdown(SocketShutdown.Both); + } + + return error; + } + + private async Task ProcessSends() + { + while (true) + { + // Wait for data to write from the pipe producer + var result = await _application.Input.ReadAsync(); + var buffer = result.Buffer; + + if (result.IsCanceled) + { + break; + } + + var end = buffer.End; + var isCompleted = result.IsCompleted; + if (!buffer.IsEmpty) + { + await _sender.SendAsync(buffer); + } + + _application.Input.AdvanceTo(end); + + if (isCompleted) + { + break; + } + } + } + + public Task StartAsync(TransferFormat transferFormat) + { + // Transfer format is irrelevant + return StartAsync(); + } + } +} diff --git a/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs b/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs new file mode 100644 index 0000000000..775ef6f6ab --- /dev/null +++ b/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs @@ -0,0 +1,15 @@ +using System.Net; +using ClientSample; + +namespace Microsoft.AspNetCore.SignalR.Client +{ + public static class TcpHubConnectionBuilderExtensions + { + public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, IPEndPoint endPoint) + { + builder.ConfigureConnectionFactory(() => new TcpConnection(endPoint)); + + return builder; + } + } +} diff --git a/samples/SignalRSamples/Program.cs b/samples/SignalRSamples/Program.cs index 7b3cd23722..2f932a18a4 100644 --- a/samples/SignalRSamples/Program.cs +++ b/samples/SignalRSamples/Program.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO; +using System.Net; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; @@ -31,9 +32,8 @@ namespace SignalRSamples options.ListenLocalhost(5000); // Hub bound to TCP end point - options.ListenLocalhost(9001, builder => + options.Listen(IPAddress.Any, 9001, builder => { - // Run the hub on this port (this won't work properly until streaming parsing is implemented) builder.UseHub(); }); }) diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs b/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs index b297431276..6d04744284 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs @@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.SignalR.Core { public string GetUserId(HubConnectionContext connection) { - return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value; + return connection.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value; } } } \ No newline at end of file