Get rid of RpcEndpoint and samples
- Merge RpcEndpoint and HubEndPoint, still need to move discovery of hub methods to another class.
This commit is contained in:
parent
f51fcadeb1
commit
b114e4e9fd
|
|
@ -1,15 +0,0 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace SocketsSample
|
||||
{
|
||||
public class Echo
|
||||
{
|
||||
public string Send(string message)
|
||||
{
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using SocketsSample.Hubs;
|
||||
|
|
@ -50,8 +49,7 @@ namespace SocketsSample
|
|||
|
||||
app.UseSockets(routes =>
|
||||
{
|
||||
routes.MapSocketEndpoint<ChatEndPoint>("/chat");
|
||||
routes.MapSocketEndpoint<RpcEndpoint<Echo>>("/jsonrpc");
|
||||
routes.MapEndpoint<ChatEndPoint>("/chat");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,15 +6,13 @@
|
|||
</head>
|
||||
<body>
|
||||
<h1>ASP.NET Sockets</h1>
|
||||
<h2>Chat sample</h2>
|
||||
<ul>
|
||||
<li><a href="sse.html">Server Sent Events</a></li>
|
||||
<li><a href="polling.html">Long polling</a></li>
|
||||
<li><a href="ws.html">Web Sockets</a></li>
|
||||
</ul>
|
||||
<h2>JSON RPC</h2>
|
||||
<h1>ASP.NET SignalR</h1>
|
||||
<ul>
|
||||
<li><a href="rpc.html">JSON RPC</a></li>
|
||||
<li><a href="hubs.html">Hubs</a></li>
|
||||
</ul>
|
||||
</body>
|
||||
|
|
|
|||
|
|
@ -1,79 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title></title>
|
||||
<script>
|
||||
function jsonRpc(url) {
|
||||
var ws = new WebSocket(url);
|
||||
var id = 0;
|
||||
var calls = {};
|
||||
|
||||
ws.onopen = function () {
|
||||
console.log('Opened!');
|
||||
};
|
||||
|
||||
ws.onmessage = function (event) {
|
||||
var response = JSON.parse(event.data);
|
||||
|
||||
var cb = calls[response.Id];
|
||||
|
||||
delete calls[response.Id];
|
||||
|
||||
if (response.Error) {
|
||||
cb.error(response.Error);
|
||||
}
|
||||
else {
|
||||
cb.success(response.Result);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = function (event) {
|
||||
console.log('Closed!');
|
||||
};
|
||||
|
||||
this.invoke = function (method, args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
calls[id] = { success: resolve, error: reject };
|
||||
ws.send(JSON.stringify({ method: method, arguments: args, id: id }));
|
||||
id++;
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
document.addEventListener('DOMContentLoaded', () => {
|
||||
var rpc = new jsonRpc(`ws://${document.location.host}/jsonrpc/ws`);
|
||||
|
||||
document.getElementById('sendmessage').addEventListener('submit', event => {
|
||||
let data = document.getElementById('data').value;
|
||||
|
||||
rpc.invoke('SocketsSample.Echo.Send', [data]).then(result => {
|
||||
var child = document.createElement('li');
|
||||
child.innerText = result;
|
||||
document.getElementById('messages').appendChild(child);
|
||||
})
|
||||
.catch(err => {
|
||||
var child = document.createElement('li');
|
||||
child.style.color = 'red';
|
||||
child.innerText = err;
|
||||
document.getElementById('messages').appendChild(child);
|
||||
});
|
||||
|
||||
event.preventDefault();
|
||||
});
|
||||
});
|
||||
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<h1>WebSockets</h1>
|
||||
|
||||
<form id="sendmessage">
|
||||
<input type="text" id="data" />
|
||||
<input type="submit" value="Send" />
|
||||
</form>
|
||||
|
||||
<ul id="messages"></ul>
|
||||
</body>
|
||||
</html>
|
||||
|
|
@ -12,7 +12,6 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
|
||||
services.AddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
|
||||
services.AddSingleton(typeof(HubEndPoint<>), typeof(HubEndPoint<>));
|
||||
services.AddSingleton(typeof(RpcEndpoint<>), typeof(RpcEndpoint<>));
|
||||
services.AddSingleton<IConfigureOptions<SignalROptions>, SignalROptionsSetup>();
|
||||
services.AddSingleton<JsonNetInvocationAdapter>();
|
||||
services.AddSingleton<InvocationAdapterRegistry>();
|
||||
|
|
|
|||
|
|
@ -1,29 +1,47 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.SignalR
|
||||
{
|
||||
public class HubEndPoint<THub> : RpcEndpoint<THub> where THub : Hub
|
||||
public class HubEndPoint<THub> : EndPoint where THub : Hub
|
||||
{
|
||||
private readonly Dictionary<string, Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>>> _callbacks
|
||||
= new Dictionary<string, Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>>>(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, Type[]> _paramTypes = new Dictionary<string, Type[]>();
|
||||
|
||||
private readonly HubLifetimeManager<THub> _lifetimeManager;
|
||||
private readonly IHubContext<THub> _hubContext;
|
||||
private readonly ILogger<HubEndPoint<THub>> _logger;
|
||||
private readonly InvocationAdapterRegistry _registry;
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
|
||||
public HubEndPoint(HubLifetimeManager<THub> lifetimeManager,
|
||||
IHubContext<THub> hubContext,
|
||||
InvocationAdapterRegistry registry,
|
||||
ILoggerFactory loggerFactory,
|
||||
ILogger<HubEndPoint<THub>> logger,
|
||||
IServiceScopeFactory serviceScopeFactory)
|
||||
: base(registry, loggerFactory, serviceScopeFactory)
|
||||
{
|
||||
_lifetimeManager = lifetimeManager;
|
||||
_hubContext = hubContext;
|
||||
_registry = registry;
|
||||
_logger = logger;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
|
||||
DiscoverHubMethods();
|
||||
}
|
||||
|
||||
public override async Task OnConnectedAsync(Connection connection)
|
||||
{
|
||||
// TODO: Dispatch from the caller
|
||||
await Task.Yield();
|
||||
|
||||
try
|
||||
{
|
||||
await _lifetimeManager.OnConnectedAsync(connection);
|
||||
|
|
@ -31,18 +49,18 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
using (var scope = _serviceScopeFactory.CreateScope())
|
||||
{
|
||||
var hub = scope.ServiceProvider.GetService<THub>() ?? Activator.CreateInstance<THub>();
|
||||
Initialize(connection, hub);
|
||||
InitializeHub(connection, hub);
|
||||
await hub.OnConnectedAsync();
|
||||
}
|
||||
|
||||
await base.OnConnectedAsync(connection);
|
||||
await DispatchMessagesAsync(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
using (var scope = _serviceScopeFactory.CreateScope())
|
||||
{
|
||||
var hub = scope.ServiceProvider.GetService<THub>() ?? Activator.CreateInstance<THub>();
|
||||
Initialize(connection, hub);
|
||||
InitializeHub(connection, hub);
|
||||
await hub.OnDisconnectedAsync();
|
||||
}
|
||||
|
||||
|
|
@ -50,16 +68,155 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
}
|
||||
}
|
||||
|
||||
protected override void BeforeInvoke(Connection connection, THub hub)
|
||||
private async Task DispatchMessagesAsync(Connection connection)
|
||||
{
|
||||
Initialize(connection, hub);
|
||||
var stream = connection.Channel.GetStream();
|
||||
var invocationAdapter = _registry.GetInvocationAdapter(connection.Metadata.Get<string>("formatType"));
|
||||
|
||||
while (true)
|
||||
{
|
||||
var invocationDescriptor =
|
||||
await invocationAdapter.ReadInvocationDescriptorAsync(
|
||||
stream, methodName =>
|
||||
{
|
||||
Type[] types;
|
||||
// TODO: null or throw?
|
||||
return _paramTypes.TryGetValue(methodName, out types) ? types : null;
|
||||
});
|
||||
|
||||
// Is there a better way of detecting that a connection was closed?
|
||||
if (invocationDescriptor == null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor);
|
||||
}
|
||||
|
||||
InvocationResultDescriptor result;
|
||||
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
|
||||
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
|
||||
{
|
||||
result = await callback(connection, invocationDescriptor);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If there's no method then return a failed response for this request
|
||||
result = new InvocationResultDescriptor
|
||||
{
|
||||
Id = invocationDescriptor.Id,
|
||||
Error = $"Unknown hub method '{invocationDescriptor.Method}'"
|
||||
};
|
||||
|
||||
_logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method);
|
||||
}
|
||||
|
||||
await invocationAdapter.WriteInvocationResultAsync(result, stream);
|
||||
}
|
||||
}
|
||||
|
||||
private void Initialize(Connection connection, THub hub)
|
||||
private void InitializeHub(Connection connection, THub hub)
|
||||
{
|
||||
hub.Clients = _hubContext.Clients;
|
||||
hub.Context = new HubCallerContext(connection);
|
||||
hub.Groups = new GroupManager<THub>(connection, _lifetimeManager);
|
||||
}
|
||||
|
||||
private void DiscoverHubMethods()
|
||||
{
|
||||
var type = typeof(THub);
|
||||
|
||||
foreach (var methodInfo in type.GetTypeInfo().DeclaredMethods.Where(m => IsHubMethod(m)))
|
||||
{
|
||||
var methodName = type.FullName + "." + methodInfo.Name;
|
||||
|
||||
if (_callbacks.ContainsKey(methodName))
|
||||
{
|
||||
throw new NotSupportedException($"Duplicate definitions of '{methodInfo.Name}'. Overloading is not supported.");
|
||||
}
|
||||
|
||||
var parameters = methodInfo.GetParameters();
|
||||
_paramTypes[methodName] = parameters.Select(p => p.ParameterType).ToArray();
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.LogDebug("Hub method '{methodName}' is bound", methodName);
|
||||
}
|
||||
|
||||
_callbacks[methodName] = async (connection, invocationDescriptor) =>
|
||||
{
|
||||
var invocationResult = new InvocationResultDescriptor()
|
||||
{
|
||||
Id = invocationDescriptor.Id
|
||||
};
|
||||
|
||||
using (var scope = _serviceScopeFactory.CreateScope())
|
||||
{
|
||||
var hub = scope.ServiceProvider.GetService<THub>();
|
||||
|
||||
bool created = false;
|
||||
|
||||
if (hub == null)
|
||||
{
|
||||
hub = Activator.CreateInstance<THub>();
|
||||
created = true;
|
||||
}
|
||||
|
||||
InitializeHub(connection, hub);
|
||||
|
||||
try
|
||||
{
|
||||
var arguments = invocationDescriptor.Arguments ?? Array.Empty<object>();
|
||||
|
||||
var args = arguments
|
||||
.Zip(parameters, (a, p) => Convert.ChangeType(a, p.ParameterType))
|
||||
.ToArray();
|
||||
|
||||
var result = methodInfo.Invoke(hub, args);
|
||||
var resultTask = result as Task;
|
||||
if (resultTask != null)
|
||||
{
|
||||
await resultTask;
|
||||
if (methodInfo.ReturnType.GetTypeInfo().IsGenericType)
|
||||
{
|
||||
var property = resultTask.GetType().GetProperty("Result");
|
||||
invocationResult.Result = property?.GetValue(resultTask);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
invocationResult.Result = result;
|
||||
}
|
||||
}
|
||||
catch (TargetInvocationException ex)
|
||||
{
|
||||
_logger.LogError(0, ex, "Failed to invoke hub method");
|
||||
invocationResult.Error = ex.InnerException.Message;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(0, ex, "Failed to invoke hub method");
|
||||
invocationResult.Error = ex.Message;
|
||||
}
|
||||
|
||||
if (created)
|
||||
{
|
||||
// Dispose the object if it's disposable and we created it
|
||||
(hub as IDisposable)?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
return invocationResult;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
private static bool IsHubMethod(MethodInfo m)
|
||||
{
|
||||
// TODO: Add more checks
|
||||
return m.IsPublic;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,185 +0,0 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.SignalR
|
||||
{
|
||||
// REVIEW: Should there be an RPC package?
|
||||
public class RpcEndpoint<T> : EndPoint where T : class
|
||||
{
|
||||
private readonly Dictionary<string, Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>>> _callbacks
|
||||
= new Dictionary<string, Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>>>(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, Type[]> _paramTypes = new Dictionary<string, Type[]>();
|
||||
|
||||
private readonly ILogger _logger;
|
||||
private readonly InvocationAdapterRegistry _registry;
|
||||
protected readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
|
||||
public RpcEndpoint(InvocationAdapterRegistry registry, ILoggerFactory loggerFactory, IServiceScopeFactory serviceScopeFactory)
|
||||
{
|
||||
_logger = loggerFactory.CreateLogger<RpcEndpoint<T>>();
|
||||
_registry = registry;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
|
||||
RegisterRPCEndPoint();
|
||||
}
|
||||
|
||||
public override async Task OnConnectedAsync(Connection connection)
|
||||
{
|
||||
// TODO: Dispatch from the caller
|
||||
await Task.Yield();
|
||||
|
||||
var stream = connection.Channel.GetStream();
|
||||
var invocationAdapter = _registry.GetInvocationAdapter(connection.Metadata.Get<string>("formatType"));
|
||||
|
||||
while (true)
|
||||
{
|
||||
var invocationDescriptor =
|
||||
await invocationAdapter.ReadInvocationDescriptorAsync(
|
||||
stream, methodName =>
|
||||
{
|
||||
Type[] types;
|
||||
// TODO: null or throw?
|
||||
return _paramTypes.TryGetValue(methodName, out types) ? types : null;
|
||||
});
|
||||
|
||||
// Is there a better way of detecting that a connection was closed?
|
||||
if (invocationDescriptor == null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.LogDebug("Received RPC request: {request}", invocationDescriptor.ToString());
|
||||
}
|
||||
|
||||
InvocationResultDescriptor result;
|
||||
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
|
||||
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
|
||||
{
|
||||
result = await callback(connection, invocationDescriptor);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If there's no method then return a failed response for this request
|
||||
result = new InvocationResultDescriptor
|
||||
{
|
||||
Id = invocationDescriptor.Id,
|
||||
Error = $"Unknown method '{invocationDescriptor.Method}'"
|
||||
};
|
||||
}
|
||||
|
||||
await invocationAdapter.WriteInvocationResultAsync(result, stream);
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual void BeforeInvoke(Connection connection, T endpoint)
|
||||
{
|
||||
}
|
||||
|
||||
protected virtual void AfterInvoke(Connection connection, T endpoint)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
protected void RegisterRPCEndPoint()
|
||||
{
|
||||
var type = typeof(T);
|
||||
|
||||
foreach (var methodInfo in type.GetTypeInfo().DeclaredMethods.Where(m => m.IsPublic))
|
||||
{
|
||||
var methodName = type.FullName + "." + methodInfo.Name;
|
||||
|
||||
if (_callbacks.ContainsKey(methodName))
|
||||
{
|
||||
throw new NotSupportedException($"Duplicate definitions of '{methodInfo.Name}'. Overloading is not supported.");
|
||||
}
|
||||
|
||||
var parameters = methodInfo.GetParameters();
|
||||
_paramTypes[methodName] = parameters.Select(p => p.ParameterType).ToArray();
|
||||
|
||||
if (_logger.IsEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.LogDebug("RPC method '{methodName}' is bound", methodName);
|
||||
}
|
||||
|
||||
_callbacks[methodName] = async (connection, invocationDescriptor) =>
|
||||
{
|
||||
var invocationResult = new InvocationResultDescriptor()
|
||||
{
|
||||
Id = invocationDescriptor.Id
|
||||
};
|
||||
|
||||
using (var scope = _serviceScopeFactory.CreateScope())
|
||||
{
|
||||
var value = scope.ServiceProvider.GetService<T>();
|
||||
|
||||
bool created = false;
|
||||
|
||||
if (value == null)
|
||||
{
|
||||
value = Activator.CreateInstance<T>();
|
||||
created = true;
|
||||
}
|
||||
|
||||
BeforeInvoke(connection, value);
|
||||
|
||||
try
|
||||
{
|
||||
var arguments = invocationDescriptor.Arguments ?? Array.Empty<object>();
|
||||
|
||||
var args = arguments
|
||||
.Zip(parameters, (a, p) => Convert.ChangeType(a, p.ParameterType))
|
||||
.ToArray();
|
||||
|
||||
var result = methodInfo.Invoke(value, args);
|
||||
var resultTask = result as Task;
|
||||
if (resultTask != null)
|
||||
{
|
||||
await resultTask;
|
||||
if (methodInfo.ReturnType.GetTypeInfo().IsGenericType)
|
||||
{
|
||||
var property = resultTask.GetType().GetProperty("Result");
|
||||
invocationResult.Result = property?.GetValue(resultTask);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
invocationResult.Result = result;
|
||||
}
|
||||
}
|
||||
catch (TargetInvocationException ex)
|
||||
{
|
||||
_logger.LogError(0, ex, "Failed to invoke RPC method");
|
||||
invocationResult.Error = ex.InnerException.Message;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(0, ex, "Failed to invoke RPC method");
|
||||
invocationResult.Error = ex.Message;
|
||||
}
|
||||
finally
|
||||
{
|
||||
AfterInvoke(connection, value);
|
||||
}
|
||||
|
||||
if (created)
|
||||
{
|
||||
// Dispose the object if it's disposable and we created it
|
||||
(value as IDisposable)?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
return invocationResult;
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Builder
|
|||
|
||||
public void MapHub<THub>(string path) where THub : Hub
|
||||
{
|
||||
_routes.MapSocketEndpoint<HubEndPoint<THub>>(path);
|
||||
_routes.MapEndpoint<HubEndPoint<THub>>(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Builder
|
|||
_dispatcher = dispatcher;
|
||||
}
|
||||
|
||||
public void MapSocketEndpoint<TEndPoint>(string path) where TEndPoint : EndPoint
|
||||
public void MapEndpoint<TEndPoint>(string path) where TEndPoint : EndPoint
|
||||
{
|
||||
_routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.ExecuteAsync<TEndPoint>(path, c)));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue