From f64c986b5dbf10eac6546d47dfef9b257e4c703d Mon Sep 17 00:00:00 2001 From: moozzyk Date: Tue, 4 Oct 2016 15:55:35 -0700 Subject: [PATCH] Adding support to non-Json hub invocation --- .../SocketsSample/EndPoints/HubEndpoint.cs | 26 +- .../{JsonRpcEndpoint.cs => RpcEndpoint.cs} | 71 ++-- samples/SocketsSample/InvocationDescriptor.cs | 16 + .../InvocationDescriptorBuilder.cs | 11 + .../InvocationResultDescriptor.cs | 16 + .../ProtobufInvocationDescriptorBuilder.cs | 49 +++ .../SocketsSample/Protobuf/RpcInvocation.cs | 390 ++++++++++++++++++ .../Protobuf/RpcInvocation.proto | 14 + samples/SocketsSample/RpcFormatterFactory.cs | 24 ++ samples/SocketsSample/RpcJSonFormatter.cs | 27 ++ samples/SocketsSample/RpcTextFormatter.cs | 58 +++ samples/SocketsSample/Startup.cs | 8 +- samples/SocketsSample/wwwroot/hubs.html | 68 ++- .../HttpConnectionDispatcher.cs | 4 + .../IFormatter.cs | 12 + .../IFormatterFactory.cs | 8 + 16 files changed, 739 insertions(+), 63 deletions(-) rename samples/SocketsSample/EndPoints/{JsonRpcEndpoint.cs => RpcEndpoint.cs} (62%) create mode 100644 samples/SocketsSample/InvocationDescriptor.cs create mode 100644 samples/SocketsSample/InvocationDescriptorBuilder.cs create mode 100644 samples/SocketsSample/InvocationResultDescriptor.cs create mode 100644 samples/SocketsSample/Protobuf/ProtobufInvocationDescriptorBuilder.cs create mode 100644 samples/SocketsSample/Protobuf/RpcInvocation.cs create mode 100644 samples/SocketsSample/Protobuf/RpcInvocation.proto create mode 100644 samples/SocketsSample/RpcFormatterFactory.cs create mode 100644 samples/SocketsSample/RpcJSonFormatter.cs create mode 100644 samples/SocketsSample/RpcTextFormatter.cs create mode 100644 src/Microsoft.AspNetCore.Sockets/IFormatter.cs create mode 100644 src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs diff --git a/samples/SocketsSample/EndPoints/HubEndpoint.cs b/samples/SocketsSample/EndPoints/HubEndpoint.cs index eb1bab8533..f0048590c5 100644 --- a/samples/SocketsSample/EndPoints/HubEndpoint.cs +++ b/samples/SocketsSample/EndPoints/HubEndpoint.cs @@ -4,20 +4,24 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Channels; +using Microsoft.AspNetCore.Sockets; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using SocketsSample.Hubs; namespace SocketsSample { - public class HubEndpoint : JsonRpcEndpoint, IHubConnectionContext + public class HubEndpoint : RpcEndpoint, IHubConnectionContext { private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; - public HubEndpoint(ILogger logger, ILogger jsonRpcLogger, IServiceProvider serviceProvider) + public HubEndpoint(ILogger logger, ILogger jsonRpcLogger, IServiceProvider serviceProvider) : base(jsonRpcLogger, serviceProvider) { _logger = logger; + _serviceProvider = serviceProvider; All = new AllClientProxy(this); } @@ -51,7 +55,7 @@ namespace SocketsSample protected override void DiscoverEndpoints() { // Register the chat hub - RegisterJsonRPCEndPoint(typeof(Chat)); + RegisterRPCEndPoint(typeof(Chat)); } private class AllClientProxy : IClientProxy @@ -67,17 +71,19 @@ namespace SocketsSample { // REVIEW: Thread safety var tasks = new List(_endPoint.Connections.Count); + var message = new InvocationDescriptor + { + Method = method, + Arguments = args + }; - byte[] message = null; + var formatterFactory = _endPoint._serviceProvider.GetRequiredService(); foreach (var connection in _endPoint.Connections) { - if (message == null) - { - message = _endPoint.Pack(method, args); - } - - tasks.Add(connection.Channel.Output.WriteAsync(message)); + // TODO: separate serialization from writing to stream + var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, (string)connection.Metadata["formatType"]); + tasks.Add(formatter.WriteAsync(message, connection.Channel.GetStream())); } return Task.WhenAll(tasks); diff --git a/samples/SocketsSample/EndPoints/JsonRpcEndpoint.cs b/samples/SocketsSample/EndPoints/RpcEndpoint.cs similarity index 62% rename from samples/SocketsSample/EndPoints/JsonRpcEndpoint.cs rename to samples/SocketsSample/EndPoints/RpcEndpoint.cs index 0286739f51..f1b21427f0 100644 --- a/samples/SocketsSample/EndPoints/JsonRpcEndpoint.cs +++ b/samples/SocketsSample/EndPoints/RpcEndpoint.cs @@ -14,13 +14,15 @@ using Newtonsoft.Json.Linq; namespace SocketsSample { // This end point implementation is used for framing JSON objects from the stream - public class JsonRpcEndpoint : EndPoint + public class RpcEndpoint : EndPoint { - private readonly Dictionary> _callbacks = new Dictionary>(StringComparer.OrdinalIgnoreCase); - private readonly ILogger _logger; + private readonly Dictionary> _callbacks + = new Dictionary>(StringComparer.OrdinalIgnoreCase); + + private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; - public JsonRpcEndpoint(ILogger logger, IServiceProvider serviceProvider) + public RpcEndpoint(ILogger logger, IServiceProvider serviceProvider) { // TODO: Discover end points _logger = logger; @@ -31,7 +33,7 @@ namespace SocketsSample protected virtual void DiscoverEndpoints() { - RegisterJsonRPCEndPoint(typeof(Echo)); + RegisterRPCEndPoint(typeof(Echo)); } public override async Task OnConnected(Connection connection) @@ -39,10 +41,9 @@ namespace SocketsSample // TODO: Dispatch from the caller await Task.Yield(); - // DO real async reads - var stream = connection.Channel.GetStream(); - var reader = new JsonTextReader(new StreamReader(stream)); - reader.SupportMultipleContent = true; + var formatterFactory = _serviceProvider.GetRequiredService(); + var formatType = (string)connection.Metadata["formatType"]; + var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, formatType); while (true) { @@ -74,38 +75,34 @@ namespace SocketsSample if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Received JSON RPC request: {request}", request); + _logger.LogDebug("Received JSON RPC request: {request}", invocationDescriptor.ToString()); } - JObject response = null; - - Func callback; - if (_callbacks.TryGetValue(request.Value("method"), out callback)) + InvocationResultDescriptor result; + Func callback; + if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback)) { - response = callback(request); + result = callback(invocationDescriptor); } else { // If there's no method then return a failed response for this request - response = new JObject(); - response["id"] = request["id"]; - response["error"] = string.Format("Unknown method '{0}'", request.Value("method")); + result = new InvocationResultDescriptor + { + Id = invocationDescriptor.Id, + Error = $"Unknown method '{invocationDescriptor.Method}'" + }; } - _logger.LogDebug("Sending JSON RPC response: {data}", response); - - var writer = new JsonTextWriter(new StreamWriter(stream)); - response.WriteTo(writer); - writer.Flush(); + await formatter.WriteAsync(result, connection.Channel.GetStream()); } } protected virtual void Initialize(object endpoint) { - } - protected void RegisterJsonRPCEndPoint(Type type) + protected void RegisterRPCEndPoint(Type type) { var methods = new List(); @@ -127,10 +124,10 @@ namespace SocketsSample _logger.LogDebug("RPC method '{methodName}' is bound", methodName); } - _callbacks[methodName] = request => + _callbacks[methodName] = invocationDescriptor => { - var response = new JObject(); - response["id"] = request["id"]; + var invocationResult = new InvocationResultDescriptor(); + invocationResult.Id = invocationDescriptor.Id; var scopeFactory = _serviceProvider.GetRequiredService(); @@ -143,27 +140,23 @@ namespace SocketsSample try { - var args = request.Value("params").Zip(parameters, (a, p) => a.ToObject(p.ParameterType)) - .ToArray(); + var args = invocationDescriptor.Arguments + .Zip(parameters, (a, p) => Convert.ChangeType(a, p.ParameterType)) + .ToArray(); - var result = m.Invoke(value, args); - - if (result != null) - { - response["result"] = JToken.FromObject(result); - } + invocationResult.Result = m.Invoke(value, args); } catch (TargetInvocationException ex) { - response["error"] = ex.InnerException.Message; + invocationResult.Error = ex.InnerException.Message; } catch (Exception ex) { - response["error"] = ex.Message; + invocationResult.Error = ex.Message; } } - return response; + return invocationResult; }; }; } diff --git a/samples/SocketsSample/InvocationDescriptor.cs b/samples/SocketsSample/InvocationDescriptor.cs new file mode 100644 index 0000000000..2f5dcac8f3 --- /dev/null +++ b/samples/SocketsSample/InvocationDescriptor.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace SocketsSample +{ + public class InvocationDescriptor + { + public string Id { get; set; } + + public string Method { get; set; } + + public object[] Arguments { get; set; } + } +} diff --git a/samples/SocketsSample/InvocationDescriptorBuilder.cs b/samples/SocketsSample/InvocationDescriptorBuilder.cs new file mode 100644 index 0000000000..8ca8bb7ea3 --- /dev/null +++ b/samples/SocketsSample/InvocationDescriptorBuilder.cs @@ -0,0 +1,11 @@ +using System; +using System.IO; +using System.Threading.Tasks; + +namespace SocketsSample +{ + interface InvocationDescriptorBuilder + { + Task CreateInvocationDescriptor(Stream stream, Func getParams); + } +} diff --git a/samples/SocketsSample/InvocationResultDescriptor.cs b/samples/SocketsSample/InvocationResultDescriptor.cs new file mode 100644 index 0000000000..ef513bf47e --- /dev/null +++ b/samples/SocketsSample/InvocationResultDescriptor.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace SocketsSample +{ + public class InvocationResultDescriptor + { + public string Id { get; set; } + + public object Result { get; set; } + + public string Error { get; set; } + } +} diff --git a/samples/SocketsSample/Protobuf/ProtobufInvocationDescriptorBuilder.cs b/samples/SocketsSample/Protobuf/ProtobufInvocationDescriptorBuilder.cs new file mode 100644 index 0000000000..f61a4bcf4a --- /dev/null +++ b/samples/SocketsSample/Protobuf/ProtobufInvocationDescriptorBuilder.cs @@ -0,0 +1,49 @@ + +using System; +using System.IO; +using System.Reflection; +using System.Threading.Tasks; +using Google.Protobuf; + +namespace SocketsSample.Protobuf +{ + public class ProtobufInvocationDescriptorBuilder : InvocationDescriptorBuilder + { + public Task CreateInvocationDescriptor(Stream stream, Func getParams) + { + var invocationDescriptor = new InvocationDescriptor(); + var inputStream = new CodedInputStream(stream, leaveOpen: true); + var invocationHeader = new RpcInvocationHeader(); + inputStream.ReadMessage(invocationHeader); + var argumentTypes = getParams(invocationHeader.Name); + + invocationDescriptor.Method = invocationHeader.Name; + invocationDescriptor.Id = invocationHeader.Id.ToString(); + invocationDescriptor.Arguments = new object[argumentTypes.Length]; + + var primitiveValueParser = PrimitiveValue.Parser; + for (var i = 0; i < argumentTypes.Length; i++) + { + if (argumentTypes[i] == typeof(int)) + { + invocationDescriptor.Arguments[i] = primitiveValueParser.ParseFrom(inputStream).Int32Value; + } + else if (argumentTypes[i] == typeof(int)) + { + invocationDescriptor.Arguments[i] = primitiveValueParser.ParseFrom(inputStream).StringValue; + } + else if (typeof(IMessage).IsAssignableFrom(argumentTypes[i])) + { + throw new NotImplementedException(); + } + } + + return Task.FromResult(invocationDescriptor); + } + + public async Task WriteResult(Stream stream, InvocationResultDescriptor result) + { + throw new NotImplementedException(); + } + } +} diff --git a/samples/SocketsSample/Protobuf/RpcInvocation.cs b/samples/SocketsSample/Protobuf/RpcInvocation.cs new file mode 100644 index 0000000000..6b5202dcce --- /dev/null +++ b/samples/SocketsSample/Protobuf/RpcInvocation.cs @@ -0,0 +1,390 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: RpcInvocation.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +/// Holder for reflection information generated from RpcInvocation.proto +public static partial class RpcInvocationReflection { + + #region Descriptor + /// File descriptor for RpcInvocation.proto + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static RpcInvocationReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "ChNScGNJbnZvY2F0aW9uLnByb3RvIkAKE1JwY0ludm9jYXRpb25IZWFkZXIS", + "DAoETmFtZRgBIAEoCRIKCgJJZBgCIAEoBRIPCgdOdW1BcmdzGAMgASgFIkcK", + "DlByaW1pdGl2ZVZhbHVlEhQKCkludDMyVmFsdWUYASABKAVIABIVCgtTdHJp", + "bmdWYWx1ZRgCIAEoCUgAQggKBm9uZW9mX2IGcHJvdG8z")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::RpcInvocationHeader), global::RpcInvocationHeader.Parser, new[]{ "Name", "Id", "NumArgs" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::PrimitiveValue), global::PrimitiveValue.Parser, new[]{ "Int32Value", "StringValue" }, new[]{ "Oneof" }, null, null) + })); + } + #endregion + +} +#region Messages +public sealed partial class RpcInvocationHeader : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new RpcInvocationHeader()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::RpcInvocationReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcInvocationHeader() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcInvocationHeader(RpcInvocationHeader other) : this() { + name_ = other.name_; + id_ = other.id_; + numArgs_ = other.numArgs_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcInvocationHeader Clone() { + return new RpcInvocationHeader(this); + } + + /// Field number for the "Name" field. + public const int NameFieldNumber = 1; + private string name_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Name { + get { return name_; } + set { + name_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "Id" field. + public const int IdFieldNumber = 2; + private int id_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Id { + get { return id_; } + set { + id_ = value; + } + } + + /// Field number for the "NumArgs" field. + public const int NumArgsFieldNumber = 3; + private int numArgs_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int NumArgs { + get { return numArgs_; } + set { + numArgs_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as RpcInvocationHeader); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(RpcInvocationHeader other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Name != other.Name) return false; + if (Id != other.Id) return false; + if (NumArgs != other.NumArgs) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Name.Length != 0) hash ^= Name.GetHashCode(); + if (Id != 0) hash ^= Id.GetHashCode(); + if (NumArgs != 0) hash ^= NumArgs.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Name.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Name); + } + if (Id != 0) { + output.WriteRawTag(16); + output.WriteInt32(Id); + } + if (NumArgs != 0) { + output.WriteRawTag(24); + output.WriteInt32(NumArgs); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Name.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Name); + } + if (Id != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Id); + } + if (NumArgs != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(NumArgs); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(RpcInvocationHeader other) { + if (other == null) { + return; + } + if (other.Name.Length != 0) { + Name = other.Name; + } + if (other.Id != 0) { + Id = other.Id; + } + if (other.NumArgs != 0) { + NumArgs = other.NumArgs; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + Name = input.ReadString(); + break; + } + case 16: { + Id = input.ReadInt32(); + break; + } + case 24: { + NumArgs = input.ReadInt32(); + break; + } + } + } + } + +} + +public sealed partial class PrimitiveValue : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PrimitiveValue()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::RpcInvocationReflection.Descriptor.MessageTypes[1]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PrimitiveValue() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PrimitiveValue(PrimitiveValue other) : this() { + switch (other.OneofCase) { + case OneofOneofCase.Int32Value: + Int32Value = other.Int32Value; + break; + case OneofOneofCase.StringValue: + StringValue = other.StringValue; + break; + } + + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PrimitiveValue Clone() { + return new PrimitiveValue(this); + } + + /// Field number for the "Int32Value" field. + public const int Int32ValueFieldNumber = 1; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Int32Value { + get { return oneofCase_ == OneofOneofCase.Int32Value ? (int) oneof_ : 0; } + set { + oneof_ = value; + oneofCase_ = OneofOneofCase.Int32Value; + } + } + + /// Field number for the "StringValue" field. + public const int StringValueFieldNumber = 2; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string StringValue { + get { return oneofCase_ == OneofOneofCase.StringValue ? (string) oneof_ : ""; } + set { + oneof_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + oneofCase_ = OneofOneofCase.StringValue; + } + } + + private object oneof_; + /// Enum of possible cases for the "oneof_" oneof. + public enum OneofOneofCase { + None = 0, + Int32Value = 1, + StringValue = 2, + } + private OneofOneofCase oneofCase_ = OneofOneofCase.None; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public OneofOneofCase OneofCase { + get { return oneofCase_; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void ClearOneof() { + oneofCase_ = OneofOneofCase.None; + oneof_ = null; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as PrimitiveValue); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(PrimitiveValue other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Int32Value != other.Int32Value) return false; + if (StringValue != other.StringValue) return false; + if (OneofCase != other.OneofCase) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (oneofCase_ == OneofOneofCase.Int32Value) hash ^= Int32Value.GetHashCode(); + if (oneofCase_ == OneofOneofCase.StringValue) hash ^= StringValue.GetHashCode(); + hash ^= (int) oneofCase_; + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (oneofCase_ == OneofOneofCase.Int32Value) { + output.WriteRawTag(8); + output.WriteInt32(Int32Value); + } + if (oneofCase_ == OneofOneofCase.StringValue) { + output.WriteRawTag(18); + output.WriteString(StringValue); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (oneofCase_ == OneofOneofCase.Int32Value) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Int32Value); + } + if (oneofCase_ == OneofOneofCase.StringValue) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(StringValue); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(PrimitiveValue other) { + if (other == null) { + return; + } + switch (other.OneofCase) { + case OneofOneofCase.Int32Value: + Int32Value = other.Int32Value; + break; + case OneofOneofCase.StringValue: + StringValue = other.StringValue; + break; + } + + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 8: { + Int32Value = input.ReadInt32(); + break; + } + case 18: { + StringValue = input.ReadString(); + break; + } + } + } + } + +} + +#endregion + + +#endregion Designer generated code diff --git a/samples/SocketsSample/Protobuf/RpcInvocation.proto b/samples/SocketsSample/Protobuf/RpcInvocation.proto new file mode 100644 index 0000000000..ddd5f331b7 --- /dev/null +++ b/samples/SocketsSample/Protobuf/RpcInvocation.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +message RpcInvocationHeader { + string Name = 1; + int32 Id = 2; + int32 NumArgs = 3; +} + +message PrimitiveValue { + oneof oneof_ { + int32 Int32Value = 1; + string StringValue = 2; + } +} \ No newline at end of file diff --git a/samples/SocketsSample/RpcFormatterFactory.cs b/samples/SocketsSample/RpcFormatterFactory.cs new file mode 100644 index 0000000000..20040353a4 --- /dev/null +++ b/samples/SocketsSample/RpcFormatterFactory.cs @@ -0,0 +1,24 @@ +using System; +using Microsoft.AspNetCore.Sockets; + +namespace SocketsSample +{ + public class RpcFormatterFactory : IFormatterFactory + { + public IFormatter CreateFormatter(Format format, string formatType) + { + if (format == Format.Text) + { + switch(formatType) + { + case "json": + return new RpcJSonFormatter(); + case "line": + return new RpcTextFormatter(); + } + } + + throw new InvalidOperationException($"No formatter for format '{format}' and formatType 'formatType'."); + } + } +} diff --git a/samples/SocketsSample/RpcJSonFormatter.cs b/samples/SocketsSample/RpcJSonFormatter.cs new file mode 100644 index 0000000000..06ac3f9080 --- /dev/null +++ b/samples/SocketsSample/RpcJSonFormatter.cs @@ -0,0 +1,27 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets; +using Newtonsoft.Json; + +namespace SocketsSample +{ + public class RpcJSonFormatter : IFormatter + { + private JsonSerializer _serializer = new JsonSerializer(); + + public async Task ReadAsync(Stream stream) + { + var reader = new JsonTextReader(new StreamReader(stream)); + return await Task.Run(() => _serializer.Deserialize(reader)); + } + + public Task WriteAsync(T value, Stream stream) + { + var writer = new JsonTextWriter(new StreamWriter(stream)); + _serializer.Serialize(writer, value); + writer.Flush(); + return Task.FromResult(0); + } + } +} diff --git a/samples/SocketsSample/RpcTextFormatter.cs b/samples/SocketsSample/RpcTextFormatter.cs new file mode 100644 index 0000000000..430e561ae5 --- /dev/null +++ b/samples/SocketsSample/RpcTextFormatter.cs @@ -0,0 +1,58 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets; + +namespace SocketsSample +{ + public class RpcTextFormatter : IFormatter + { + public async Task ReadAsync(Stream stream) + { + var streamReader = new StreamReader(stream); + var line = await streamReader.ReadLineAsync(); + var values = line.Split(','); + + object x = new InvocationDescriptor + { + Id = values[0].Substring(2), + Method = values[1].Substring(1), + Arguments = values.Skip(2).ToArray() + }; + + return (T)x; + } + + public async Task WriteAsync(T value, Stream stream) + { + var result = value as InvocationResultDescriptor; + if (result != null) + { + var msg = $"RI{result.Id}," + string.IsNullOrEmpty(result.Error) != null + ? $"E{result.Error}\n" + : $"R{result.Result.ToString()}\n"; + + await WriteAsync(stream, msg); + return; + } + + var invocation = value as InvocationDescriptor; + if (invocation != null) + { + var msg = $"CI{invocation.Id},M{invocation.Method},{string.Join(",", invocation.Arguments.Select(a => a.ToString()))}\n"; + await WriteAsync(stream, msg); + return; + } + + throw new NotImplementedException("Unsupported type"); + } + + private async Task WriteAsync(Stream stream, string msg) + { + var writer = new StreamWriter(stream); + await writer.WriteAsync(msg); + await writer.FlushAsync(); + } + } +} diff --git a/samples/SocketsSample/Startup.cs b/samples/SocketsSample/Startup.cs index 224379c195..87c3a42c41 100644 --- a/samples/SocketsSample/Startup.cs +++ b/samples/SocketsSample/Startup.cs @@ -1,6 +1,6 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Routing; +using Microsoft.AspNetCore.Sockets; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -15,8 +15,10 @@ namespace SocketsSample services.AddRouting(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); + + services.AddSingleton(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -35,7 +37,7 @@ namespace SocketsSample { routes.MapSocketEndpoint("/hubs"); routes.MapSocketEndpoint("/chat"); - routes.MapSocketEndpoint("/jsonrpc"); + routes.MapSocketEndpoint("/jsonrpc"); }); } } diff --git a/samples/SocketsSample/wwwroot/hubs.html b/samples/SocketsSample/wwwroot/hubs.html index 65f72df842..a45acd8858 100644 --- a/samples/SocketsSample/wwwroot/hubs.html +++ b/samples/SocketsSample/wwwroot/hubs.html @@ -15,13 +15,33 @@ }; ws.onmessage = function (event) { - var response = JSON.parse(event.data); + + let response = {}; + + if (document.getElementById('formatType').value == 'line') + { + let parts = evt.data.split(','); + if (evt.data[0] == 'R') + { + response.Id = parts[0].slice(2); + response.Result = parts[1].slice(1); + } + else + { + response.Method = parts[1].slice(1); + response.Arguments = parts.slice(2).join(); + } + } + else + { + response = JSON.parse(evt.data); + } // Response if (typeof response.id === "number") { - var cb = calls[response.id]; + var cb = calls[response.Id]; - delete calls[response.id]; + delete calls[response.Id]; if (response.error) { cb.error(response.error); @@ -32,7 +52,7 @@ } else { // Reverse JSON RPC - methods[response.method](response.params); + methods[response.Method](response.Arguments); } }; @@ -40,10 +60,15 @@ console.log('Closed!'); }; - this.invoke = function (method, args) { + this.invoke = function (method, args, formatType) { return new Promise((resolve, reject) => { calls[id] = { success: resolve, error: reject }; - ws.send(JSON.stringify({ method: method, params: args, id: id })); + if (formatType == 'line') { + ws.send(`CI${id},M${method},${args.join()}\n`); + } + else { + ws.send(JSON.stringify({ method: method, arguments: args, id: id })); + } id++; }); }; @@ -55,7 +80,15 @@ document.addEventListener('DOMContentLoaded', () => { - var conn = new hubConnection(`ws://${document.location.host}/hubs/ws`); + let connectButton = document.getElementById('connect'); + connectButton.addEventListener('click', () => { + run(document.getElementById('format').value, document.getElementById('formatType').value); + connectButton.disabled = true; + }); + }); + + function run(format, formatType) { + var conn = new hubConnection(`ws://${document.location.host}/hubs/ws?format=${format}&formatType=${formatType}`); conn.on('Send', function (message) { var child = document.createElement('li'); @@ -66,7 +99,7 @@ document.getElementById('sendmessage').addEventListener('submit', event => { let data = document.getElementById('data').value; - conn.invoke('SocketsSample.Hubs.Chat.Send', [data]).catch(err => { + conn.invoke('SocketsSample.Hubs.Chat.Send', [data], formatType).catch(err => { var child = document.createElement('li'); child.style.color = 'red'; child.innerText = err; @@ -75,16 +108,29 @@ event.preventDefault(); }); - }); + };

WebSockets

+
+ + + + + +
- - + +
    diff --git a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs index 663fab8c73..1da06ac2de 100644 --- a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs @@ -61,6 +61,10 @@ namespace Microsoft.AspNetCore.Sockets state.Connection.Metadata["transport"] = "websockets"; state.Connection.Metadata.Format = format; + // TODO: this is wrong. + how does the user add their own metadata based on HttpContext + var formatType = (string)context.Request.Query["formatType"]; + state.Connection.Metadata["formatType"] = string.IsNullOrEmpty(formatType) ? "json" : formatType; + var ws = new WebSockets(state.Connection); await DoPersistentConnection(endpoint, ws, context, state.Connection); diff --git a/src/Microsoft.AspNetCore.Sockets/IFormatter.cs b/src/Microsoft.AspNetCore.Sockets/IFormatter.cs new file mode 100644 index 0000000000..4183571460 --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets/IFormatter.cs @@ -0,0 +1,12 @@ +using System.IO; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Sockets +{ + // TODO: Is this name too generic? + public interface IFormatter + { + Task ReadAsync(Stream stream); + Task WriteAsync(T value, Stream stream); + } +} diff --git a/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs b/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs new file mode 100644 index 0000000000..e7313d64a4 --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs @@ -0,0 +1,8 @@ +namespace Microsoft.AspNetCore.Sockets +{ + // TODO: Should the user implement this or just register their formatters? + public interface IFormatterFactory + { + IFormatter CreateFormatter(Format format, string formatType); + } +}