diff --git a/samples/ClientSample/ClientSample.csproj b/samples/ClientSample/ClientSample.csproj
index f38b39eb13..966067f59b 100644
--- a/samples/ClientSample/ClientSample.csproj
+++ b/samples/ClientSample/ClientSample.csproj
@@ -7,6 +7,10 @@
Exe
+
+
+
+
diff --git a/samples/ClientSample/HubSample.cs b/samples/ClientSample/HubSample.cs
index 93def0cbb7..a480e65430 100644
--- a/samples/ClientSample/HubSample.cs
+++ b/samples/ClientSample/HubSample.cs
@@ -3,6 +3,7 @@
using System;
using System.Linq;
+using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Client;
@@ -27,12 +28,11 @@ namespace ClientSample
public static async Task ExecuteAsync(string baseUrl)
{
- baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;
-
- Console.WriteLine("Connecting to {0}", baseUrl);
+ var endpoint = new IPEndPoint(IPAddress.Loopback, 9001);
+ Console.WriteLine("Connecting to {0}", endpoint);
var connection = new HubConnectionBuilder()
- .WithUrl(baseUrl)
- .WithConsoleLogger(LogLevel.Trace)
+ .WithEndPoint(endpoint)
+ .WithConsoleLogger(LogLevel.Information)
.Build();
try
@@ -44,7 +44,7 @@ namespace ClientSample
await ConnectAsync(connection);
- Console.WriteLine("Connected to {0}", baseUrl);
+ Console.WriteLine("Connected to {0}", endpoint);
var sendCts = new CancellationTokenSource();
diff --git a/samples/ClientSample/Tcp/BufferExtensions.cs b/samples/ClientSample/Tcp/BufferExtensions.cs
new file mode 100644
index 0000000000..9a74e15ba0
--- /dev/null
+++ b/samples/ClientSample/Tcp/BufferExtensions.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Text;
+
+namespace System
+{
+ public static class BufferExtensions
+ {
+ public static ArraySegment GetArray(this Memory memory)
+ {
+ return ((ReadOnlyMemory)memory).GetArray();
+ }
+
+ public static ArraySegment GetArray(this ReadOnlyMemory memory)
+ {
+ if (!MemoryMarshal.TryGetArray(memory, out var result))
+ {
+ throw new InvalidOperationException("Buffer backed by array was expected");
+ }
+ return result;
+ }
+ }
+}
diff --git a/samples/ClientSample/Tcp/SocketAwaitable.cs b/samples/ClientSample/Tcp/SocketAwaitable.cs
new file mode 100644
index 0000000000..d7b70425d6
--- /dev/null
+++ b/samples/ClientSample/Tcp/SocketAwaitable.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ClientSample
+{
+ public class SocketAwaitable : ICriticalNotifyCompletion
+ {
+ private static readonly Action _callbackCompleted = () => { };
+
+ private readonly PipeScheduler _ioScheduler;
+
+ private Action _callback;
+ private int _bytesTransferred;
+ private SocketError _error;
+
+ public SocketAwaitable(PipeScheduler ioScheduler)
+ {
+ _ioScheduler = ioScheduler;
+ }
+
+ public SocketAwaitable GetAwaiter() => this;
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ public int GetResult()
+ {
+ Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
+
+ _callback = null;
+
+ if (_error != SocketError.Success)
+ {
+ throw new SocketException((int)_error);
+ }
+
+ return _bytesTransferred;
+ }
+
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(_callback, _callbackCompleted) ||
+ ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ Task.Run(continuation);
+ }
+ }
+
+ public void UnsafeOnCompleted(Action continuation)
+ {
+ OnCompleted(continuation);
+ }
+
+ public void Complete(int bytesTransferred, SocketError socketError)
+ {
+ _error = socketError;
+ _bytesTransferred = bytesTransferred;
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+
+ if (continuation != null)
+ {
+ _ioScheduler.Schedule(state => ((Action)state)(), continuation);
+ }
+ }
+ }
+}
diff --git a/samples/ClientSample/Tcp/SocketReceiver.cs b/samples/ClientSample/Tcp/SocketReceiver.cs
new file mode 100644
index 0000000000..8c6421f078
--- /dev/null
+++ b/samples/ClientSample/Tcp/SocketReceiver.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Collections.Generic;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using System.Text;
+
+namespace ClientSample
+{
+ public class SocketReceiver
+ {
+ private readonly Socket _socket;
+ private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
+ private readonly SocketAwaitable _awaitable;
+
+ public SocketReceiver(Socket socket, PipeScheduler scheduler)
+ {
+ _socket = socket;
+ _awaitable = new SocketAwaitable(scheduler);
+ _eventArgs.UserToken = _awaitable;
+ _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
+ }
+
+ public SocketAwaitable ReceiveAsync(Memory buffer)
+ {
+#if NETCOREAPP2_1
+ _eventArgs.SetBuffer(buffer);
+#else
+ var segment = buffer.GetArray();
+
+ _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
+#endif
+ if (!_socket.ReceiveAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+ }
+}
diff --git a/samples/ClientSample/Tcp/SocketSender.cs b/samples/ClientSample/Tcp/SocketSender.cs
new file mode 100644
index 0000000000..7fa1c20ac5
--- /dev/null
+++ b/samples/ClientSample/Tcp/SocketSender.cs
@@ -0,0 +1,100 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+using System.Text;
+
+namespace ClientSample
+{
+ public class SocketSender
+ {
+ private readonly Socket _socket;
+ private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
+ private readonly SocketAwaitable _awaitable;
+
+ private List> _bufferList;
+
+ public SocketSender(Socket socket, PipeScheduler scheduler)
+ {
+ _socket = socket;
+ _awaitable = new SocketAwaitable(scheduler);
+ _eventArgs.UserToken = _awaitable;
+ _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
+ }
+
+ public SocketAwaitable SendAsync(ReadOnlySequence buffers)
+ {
+ if (buffers.IsSingleSegment)
+ {
+ return SendAsync(buffers.First);
+ }
+
+#if NETCOREAPP2_1
+ if (!_eventArgs.MemoryBuffer.Equals(Memory.Empty))
+#else
+ if (_eventArgs.Buffer != null)
+#endif
+ {
+ _eventArgs.SetBuffer(null, 0, 0);
+ }
+
+ _eventArgs.BufferList = GetBufferList(buffers);
+
+ if (!_socket.SendAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+
+ private SocketAwaitable SendAsync(ReadOnlyMemory memory)
+ {
+ // The BufferList getter is much less expensive then the setter.
+ if (_eventArgs.BufferList != null)
+ {
+ _eventArgs.BufferList = null;
+ }
+
+#if NETCOREAPP2_1
+ _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
+#else
+ var segment = memory.GetArray();
+
+ _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
+#endif
+ if (!_socket.SendAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+
+ private List> GetBufferList(ReadOnlySequence buffer)
+ {
+ Debug.Assert(!buffer.IsEmpty);
+ Debug.Assert(!buffer.IsSingleSegment);
+
+ if (_bufferList == null)
+ {
+ _bufferList = new List>();
+ }
+ else
+ {
+ // Buffers are pooled, so it's OK to root them until the next multi-buffer write.
+ _bufferList.Clear();
+ }
+
+ foreach (var b in buffer)
+ {
+ _bufferList.Add(b.GetArray());
+ }
+
+ return _bufferList;
+ }
+ }
+}
diff --git a/samples/ClientSample/Tcp/TcpConnection.cs b/samples/ClientSample/Tcp/TcpConnection.cs
new file mode 100644
index 0000000000..a053486a7e
--- /dev/null
+++ b/samples/ClientSample/Tcp/TcpConnection.cs
@@ -0,0 +1,242 @@
+using System;
+using System.IO;
+using System.IO.Pipelines;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Connections;
+using Microsoft.AspNetCore.Http.Features;
+
+namespace ClientSample
+{
+ public class TcpConnection : IConnection
+ {
+ private readonly Socket _socket;
+ private volatile bool _aborted;
+ private readonly EndPoint _endPoint;
+ private IDuplexPipe _transport;
+ private IDuplexPipe _application;
+ private SocketSender _sender;
+ private SocketReceiver _receiver;
+
+ public TcpConnection(EndPoint endPoint)
+ {
+ _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
+ _endPoint = endPoint;
+
+ _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
+ _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
+ }
+
+ public IDuplexPipe Transport => _transport;
+
+ public IFeatureCollection Features { get; } = new FeatureCollection();
+
+ public Task DisposeAsync()
+ {
+ _transport?.Output.Complete();
+ _transport?.Input.Complete();
+
+ _socket?.Dispose();
+
+ return Task.CompletedTask;
+ }
+
+ public async Task StartAsync()
+ {
+ await _socket.ConnectAsync(_endPoint);
+
+ var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
+
+ _transport = pair.Transport;
+ _application = pair.Application;
+
+ _ = ExecuteAsync();
+ }
+
+ private async Task ExecuteAsync()
+ {
+ Exception sendError = null;
+ try
+ {
+ // Spawn send and receive logic
+ Task receiveTask = DoReceive();
+ Task sendTask = DoSend();
+
+ // If the sending task completes then close the receive
+ // We don't need to do this in the other direction because the kestrel
+ // will trigger the output closing once the input is complete.
+ if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
+ {
+ // Tell the reader it's being aborted
+ _socket.Dispose();
+ }
+
+ // Now wait for both to complete
+ await receiveTask;
+ sendError = await sendTask;
+
+ // Dispose the socket(should noop if already called)
+ _socket.Dispose();
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex);
+ }
+ finally
+ {
+ // Complete the output after disposing the socket
+ _application.Input.Complete(sendError);
+ }
+ }
+ private async Task DoReceive()
+ {
+ Exception error = null;
+
+ try
+ {
+ await ProcessReceives();
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
+ {
+ error = new ConnectionResetException(ex.Message, ex);
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
+ ex.SocketErrorCode == SocketError.ConnectionAborted ||
+ ex.SocketErrorCode == SocketError.Interrupted ||
+ ex.SocketErrorCode == SocketError.InvalidArgument)
+ {
+ if (!_aborted)
+ {
+ // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
+ error = new ConnectionAbortedException();
+ }
+ }
+ catch (ObjectDisposedException)
+ {
+ if (!_aborted)
+ {
+ error = new ConnectionAbortedException();
+ }
+ }
+ catch (IOException ex)
+ {
+ error = ex;
+ }
+ catch (Exception ex)
+ {
+ error = new IOException(ex.Message, ex);
+ }
+ finally
+ {
+ if (_aborted)
+ {
+ error = error ?? new ConnectionAbortedException();
+ }
+
+ _application.Output.Complete(error);
+ }
+ }
+
+ private async Task ProcessReceives()
+ {
+ while (true)
+ {
+ // Ensure we have some reasonable amount of buffer space
+ var buffer = _application.Output.GetMemory();
+
+ var bytesReceived = await _receiver.ReceiveAsync(buffer);
+
+ if (bytesReceived == 0)
+ {
+ // FIN
+ break;
+ }
+
+ _application.Output.Advance(bytesReceived);
+
+ var flushTask = _application.Output.FlushAsync();
+
+ if (!flushTask.IsCompleted)
+ {
+ await flushTask;
+ }
+
+ var result = flushTask.GetAwaiter().GetResult();
+ if (result.IsCompleted)
+ {
+ // Pipe consumer is shut down, do we stop writing
+ break;
+ }
+ }
+ }
+
+ private async Task DoSend()
+ {
+ Exception error = null;
+
+ try
+ {
+ await ProcessSends();
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
+ {
+ error = null;
+ }
+ catch (ObjectDisposedException)
+ {
+ error = null;
+ }
+ catch (IOException ex)
+ {
+ error = ex;
+ }
+ catch (Exception ex)
+ {
+ error = new IOException(ex.Message, ex);
+ }
+ finally
+ {
+ _aborted = true;
+ _socket.Shutdown(SocketShutdown.Both);
+ }
+
+ return error;
+ }
+
+ private async Task ProcessSends()
+ {
+ while (true)
+ {
+ // Wait for data to write from the pipe producer
+ var result = await _application.Input.ReadAsync();
+ var buffer = result.Buffer;
+
+ if (result.IsCanceled)
+ {
+ break;
+ }
+
+ var end = buffer.End;
+ var isCompleted = result.IsCompleted;
+ if (!buffer.IsEmpty)
+ {
+ await _sender.SendAsync(buffer);
+ }
+
+ _application.Input.AdvanceTo(end);
+
+ if (isCompleted)
+ {
+ break;
+ }
+ }
+ }
+
+ public Task StartAsync(TransferFormat transferFormat)
+ {
+ // Transfer format is irrelevant
+ return StartAsync();
+ }
+ }
+}
diff --git a/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs b/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs
new file mode 100644
index 0000000000..775ef6f6ab
--- /dev/null
+++ b/samples/ClientSample/Tcp/TcpHubConnectionBuilderExtensions.cs
@@ -0,0 +1,15 @@
+using System.Net;
+using ClientSample;
+
+namespace Microsoft.AspNetCore.SignalR.Client
+{
+ public static class TcpHubConnectionBuilderExtensions
+ {
+ public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, IPEndPoint endPoint)
+ {
+ builder.ConfigureConnectionFactory(() => new TcpConnection(endPoint));
+
+ return builder;
+ }
+ }
+}
diff --git a/samples/SignalRSamples/Program.cs b/samples/SignalRSamples/Program.cs
index 7b3cd23722..2f932a18a4 100644
--- a/samples/SignalRSamples/Program.cs
+++ b/samples/SignalRSamples/Program.cs
@@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
+using System.Net;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Configuration;
@@ -31,9 +32,8 @@ namespace SignalRSamples
options.ListenLocalhost(5000);
// Hub bound to TCP end point
- options.ListenLocalhost(9001, builder =>
+ options.Listen(IPAddress.Any, 9001, builder =>
{
- // Run the hub on this port (this won't work properly until streaming parsing is implemented)
builder.UseHub();
});
})
diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs b/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs
index b297431276..6d04744284 100644
--- a/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs
+++ b/src/Microsoft.AspNetCore.SignalR.Core/DefaultUserIdProvider.cs
@@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.SignalR.Core
{
public string GetUserId(HubConnectionContext connection)
{
- return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value;
+ return connection.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value;
}
}
}
\ No newline at end of file