Added hub sample

This commit is contained in:
David Fowler 2016-10-02 03:23:23 -07:00
parent 58e58b7fb7
commit 1a5138e972
8 changed files with 278 additions and 6 deletions

View File

@ -0,0 +1,81 @@
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using SocketsSample.Hubs;
namespace SocketsSample
{
public class HubEndpoint : JsonRpcEndpoint
{
private readonly ILogger<HubEndpoint> _logger;
private readonly Bus _bus = new Bus();
public HubEndpoint(ILogger<HubEndpoint> logger, ILogger<JsonRpcEndpoint> jsonRpcLogger, IServiceProvider serviceProvider)
: base(jsonRpcLogger, serviceProvider)
{
_logger = logger;
}
public override Task OnConnected(Connection connection)
{
// TODO: Get the list of hubs and signals over the connection
// Subscribe to the hub
_bus.Subscribe(nameof(Chat), message => OnMessage(connection, message));
// Subscribe to the connection id
_bus.Subscribe(connection.ConnectionId, message => OnMessage(connection, message));
return base.OnConnected(connection);
}
protected override bool HandleResponse(string connectionId, JObject response)
{
var ignore = _bus.Publish(connectionId, new Message
{
Payload = Encoding.UTF8.GetBytes(response.ToString())
});
return true;
}
private Task OnMessage(Connection connection, Message message)
{
return connection.Channel.Output.WriteAsync(message.Payload);
}
public Task Invoke(string key, string method, object[] args)
{
var obj = new JObject();
obj["method"] = method;
obj["params"] = new JArray(args.Select(a => JToken.FromObject(a)).ToArray());
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Outgoing RPC invocation method '{methodName}' to {signal}", method, key);
}
return _bus.Publish(key, new Message
{
Payload = Encoding.UTF8.GetBytes(obj.ToString())
});
}
protected override void Initialize(object endpoint)
{
((Hub)endpoint).Clients = new HubConnectionContext(endpoint.GetType().Name, this);
base.Initialize(endpoint);
}
protected override void DiscoverEndpoints()
{
// Register the chat hub
RegisterJsonRPCEndPoint(typeof(Chat));
}
}
}

View File

@ -26,6 +26,11 @@ namespace SocketsSample
_logger = logger;
_serviceProvider = serviceProvider;
DiscoverEndpoints();
}
protected virtual void DiscoverEndpoints()
{
RegisterJsonRPCEndPoint(typeof(Echo));
}
@ -81,15 +86,28 @@ namespace SocketsSample
response["error"] = string.Format("Unknown method '{0}'", request.Value<string>("method"));
}
_logger.LogDebug("Sending JSON RPC response: {data}", response);
if (!HandleResponse(connection.ConnectionId, response))
{
_logger.LogDebug("Sending JSON RPC response: {data}", response);
var writer = new JsonTextWriter(new StreamWriter(stream));
response.WriteTo(writer);
writer.Flush();
var writer = new JsonTextWriter(new StreamWriter(stream));
response.WriteTo(writer);
writer.Flush();
}
}
}
private void RegisterJsonRPCEndPoint(Type type)
protected virtual bool HandleResponse(string connectionId, JObject response)
{
return false;
}
protected virtual void Initialize(object endpoint)
{
}
protected void RegisterJsonRPCEndPoint(Type type)
{
var methods = new List<string>();
@ -123,6 +141,8 @@ namespace SocketsSample
{
object value = scope.ServiceProvider.GetService(type) ?? Activator.CreateInstance(type);
Initialize(value);
try
{
var args = request.Value<JArray>("params").Zip(parameters, (a, p) => a.ToObject(p.ParameterType))

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace SocketsSample.Hubs
{
public class Chat : Hub
{
public void Send(string message)
{
Clients.All.Invoke("Send", message);
}
}
}

View File

@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SocketsSample.Hubs
{
public class Hub
{
public IHubConnectionContext Clients { get; set; }
}
public interface IHubConnectionContext
{
IClientProxy All { get; }
IClientProxy Client(string connectionId);
}
public class HubConnectionContext : IHubConnectionContext
{
private readonly HubEndpoint _endPoint;
public HubConnectionContext(string hubName, HubEndpoint endpoint)
{
_endPoint = endpoint;
All = new HubClientProxy(endpoint, hubName);
}
public IClientProxy All { get; }
public IClientProxy Client(string connectionId)
{
return new HubClientProxy(_endPoint, connectionId);
}
}
public class HubClientProxy : IClientProxy
{
private readonly HubEndpoint _endPoint;
private readonly string _key;
public HubClientProxy(HubEndpoint endPoint, string key)
{
_endPoint = endPoint;
_key = key;
}
public Task Invoke(string method, params object[] args)
{
return _endPoint.Invoke(_key, method, args);
}
}
public interface IClientProxy
{
/// <summary>
/// Invokes a method on the connection(s) represented by the <see cref="IClientProxy"/> instance.
/// </summary>
/// <param name="method">name of the method to invoke</param>
/// <param name="args">argumetns to pass to the client</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
Task Invoke(string method, params object[] args);
}
}

View File

@ -14,6 +14,7 @@ namespace SocketsSample
{
services.AddRouting();
services.AddSingleton<HubEndpoint>();
services.AddSingleton<JsonRpcEndpoint>();
services.AddSingleton<ChatEndPoint>();
}
@ -32,6 +33,7 @@ namespace SocketsSample
app.UseSockets(d =>
{
d.MapSocketEndpoint<HubEndpoint>("/hubs");
d.MapSocketEndpoint<ChatEndPoint>("/chat");
d.MapSocketEndpoint<JsonRpcEndpoint>("/jsonrpc");
});

View File

@ -0,0 +1,87 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
<script>
function hubConnection(url) {
var ws = new WebSocket(url);
var id = 0;
var calls = {};
var methods = {};
ws.onopen = function () {
console.log('Opened!');
};
ws.onmessage = function (evt) {
var response = JSON.parse(evt.data);
// Response
if (response.id) {
var cb = calls[response.id];
delete calls[response.id];
if (response.error) {
cb.error(response.error);
}
else {
cb.success(response.result);
}
}
else {
// Reverse JSON RPC
methods[response.method](response.params);
}
};
ws.onclose = function (evt) {
console.log('Closed!');
};
this.invoke = function (method, args) {
return new Promise((resolve, reject) => {
calls[id] = { success: resolve, error: reject };
ws.send(JSON.stringify({ method: method, params: args, id: id }));
id++;
});
};
this.on = function (method, fn) {
methods[method] = fn;
};
}
document.addEventListener('DOMContentLoaded', () => {
var conn = new hubConnection('ws://localhost:5000/hubs/ws');
conn.on('Send', function (message) {
var child = document.createElement('li');
child.innerText = message;
document.getElementById('messages').appendChild(child);
});
document.getElementById('sendmessage').addEventListener('click', () => {
let data = document.getElementById('data').value;
conn.invoke('SocketsSample.Hubs.Chat.Send', [data]).catch(err => {
var child = document.createElement('li');
child.style.color = 'red';
child.innerText = err;
document.getElementById('messages').appendChild(child);
});
});
});
</script>
</head>
<body>
<h1>WebSockets</h1>
<input type="text" id="data" />
<input type="button" id="sendmessage" value="Send" />
<ul id="messages"></ul>
</body>
</html>

View File

@ -13,6 +13,9 @@
<li><a href="ws.html">Web Sockets</a></li>
</ul>
<h2>JSON RPC</h2>
<a href="rpc.html">JSON RPC</a>
<ul>
<li><a href="rpc.html">JSON RPC</a></li>
<li><a href="hubs.html">Hubs</a></li>
</ul>
</body>
</html>