Addressing review comments

This commit is contained in:
moozzyk 2016-10-26 12:09:48 -07:00
parent ca2a471691
commit ffc0a048e3
9 changed files with 45 additions and 38 deletions

View File

@ -8,15 +8,11 @@ namespace SocketsSample
{
public class ChatEndPoint : EndPoint
{
public ChatEndPoint()
{
}
public override async Task OnConnected(Connection connection)
{
await Broadcast($"{connection.ConnectionId} connected ({connection.Metadata["transport"]})");
while (true)
{
var result = await connection.Channel.Input.ReadAsync();

View File

@ -63,10 +63,12 @@ namespace SocketsSample
foreach (var connection in _endPoint.Connections)
{
var invocationAdapter = _endPoint._serviceProvider.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
var invocationAdapter =
_endPoint._serviceProvider
.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
tasks.Add(invocationAdapter.InvokeClientMethod(connection.Channel.GetStream(), message));
tasks.Add(invocationAdapter.WriteInvocationDescriptor(message, connection.Channel.GetStream()));
}
return Task.WhenAll(tasks);
@ -88,8 +90,10 @@ namespace SocketsSample
{
var connection = _endPoint.Connections[_connectionId];
var invocationAdapter = _endPoint._serviceProvider.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
var invocationAdapter =
_endPoint._serviceProvider
.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
if (_endPoint._logger.IsEnabled(LogLevel.Debug))
{
@ -102,7 +106,7 @@ namespace SocketsSample
Arguments = args
};
return invocationAdapter.InvokeClientMethod(connection.Channel.GetStream(), message);
return invocationAdapter.WriteInvocationDescriptor(message, connection.Channel.GetStream());
}
}
}

View File

@ -44,13 +44,15 @@ namespace SocketsSample
await Task.Yield();
var stream = connection.Channel.GetStream();
var invocationAdapter = _serviceProvider.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
var invocationAdapter =
_serviceProvider
.GetRequiredService<InvocationAdapterRegistry>()
.GetInvocationAdapter((string)connection.Metadata["formatType"]);
while (true)
{
var invocationDescriptor =
await invocationAdapter.CreateInvocationDescriptor(
await invocationAdapter.ReadInvocationDescriptor(
stream, methodName => {
Type[] types;
// TODO: null or throw?
@ -84,7 +86,7 @@ namespace SocketsSample
};
}
await invocationAdapter.WriteInvocationResult(stream, result);
await invocationAdapter.WriteInvocationResult(result, stream);
}
}

View File

@ -6,10 +6,10 @@ namespace SocketsSample
{
public interface IInvocationAdapter
{
Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams);
Task<InvocationDescriptor> ReadInvocationDescriptor(Stream stream, Func<string, Type[]> getParams);
Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor);
Task WriteInvocationResult(InvocationResultDescriptor resultDescriptor, Stream stream);
Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor);
Task WriteInvocationDescriptor(InvocationDescriptor invocationDescriptor, Stream stream);
}
}

View File

@ -5,33 +5,33 @@ using Newtonsoft.Json;
namespace SocketsSample
{
public class JSonInvocationAdapter : IInvocationAdapter
public class JsonInvocationAdapter : IInvocationAdapter
{
private JsonSerializer _serializer = new JsonSerializer();
public JSonInvocationAdapter()
public JsonInvocationAdapter()
{
}
public async Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
public async Task<InvocationDescriptor> ReadInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
{
var reader = new JsonTextReader(new StreamReader(stream));
return await Task.Run(() => _serializer.Deserialize<InvocationDescriptor>(reader));
}
public Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor)
public Task WriteInvocationResult(InvocationResultDescriptor resultDescriptor, Stream stream)
{
Write(stream, resultDescriptor);
Write(resultDescriptor, stream);
return Task.FromResult(0);
}
public Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor)
public Task WriteInvocationDescriptor(InvocationDescriptor invocationDescriptor, Stream stream)
{
Write(stream, invocationDescriptor);
Write(invocationDescriptor, stream);
return Task.FromResult(0);
}
private void Write(Stream stream, object value)
private void Write(object value, Stream stream)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, value);

View File

@ -8,7 +8,7 @@ namespace SocketsSample
{
public class LineInvocationAdapter : IInvocationAdapter
{
public async Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
public async Task<InvocationDescriptor> ReadInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
{
var streamReader = new StreamReader(stream);
var line = await streamReader.ReadLineAsync();
@ -29,25 +29,25 @@ namespace SocketsSample
};
}
public async Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor)
public async Task WriteInvocationDescriptor(InvocationDescriptor invocationDescriptor, Stream stream)
{
var msg = $"CI{invocationDescriptor.Id},M{invocationDescriptor.Method},{string.Join(",", invocationDescriptor.Arguments.Select(a => a.ToString()))}\n";
await WriteAsync(stream, msg);
await WriteAsync(msg, stream);
}
public async Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor)
public async Task WriteInvocationResult(InvocationResultDescriptor resultDescriptor, Stream stream)
{
if (string.IsNullOrEmpty(resultDescriptor.Error))
{
await WriteAsync(stream, $"RI{resultDescriptor.Id},E{resultDescriptor.Error}\n");
await WriteAsync($"RI{resultDescriptor.Id},E{resultDescriptor.Error}\n", stream);
}
else
{
await WriteAsync(stream, $"RI{resultDescriptor.Id},R{(resultDescriptor.Result != null ? resultDescriptor.Result.ToString() : string.Empty)}\n");
await WriteAsync($"RI{resultDescriptor.Id},R{(resultDescriptor.Result != null ? resultDescriptor.Result.ToString() : string.Empty)}\n", stream);
}
}
private async Task WriteAsync(Stream stream, string msg)
private async Task WriteAsync(string msg, Stream stream)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);

View File

@ -8,14 +8,14 @@ namespace SocketsSample.Protobuf
{
public class ProtobufInvocationAdapter : IInvocationAdapter
{
IServiceProvider _serviceProvider;
private IServiceProvider _serviceProvider;
public ProtobufInvocationAdapter(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
public async Task<InvocationDescriptor> ReadInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
{
return await Task.Run(() => CreateInvocationDescriptorInt(stream, getParams));
}
@ -58,7 +58,7 @@ namespace SocketsSample.Protobuf
return Task.FromResult(invocationDescriptor);
}
public async Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor)
public async Task WriteInvocationResult(InvocationResultDescriptor resultDescriptor, Stream stream)
{
var outputStream = new CodedOutputStream(stream, leaveOpen: true);
outputStream.WriteMessage(new RpcMessageKind() { MessageKind = RpcMessageKind.Types.Kind.Result });
@ -100,7 +100,7 @@ namespace SocketsSample.Protobuf
await stream.FlushAsync();
}
public async Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor)
public async Task WriteInvocationDescriptor(InvocationDescriptor invocationDescriptor, Stream stream)
{
var outputStream = new CodedOutputStream(stream, leaveOpen: true);
outputStream.WriteMessage(new RpcMessageKind() { MessageKind = RpcMessageKind.Types.Kind.Invocation });
@ -124,6 +124,12 @@ namespace SocketsSample.Protobuf
{
outputStream.WriteMessage(new PrimitiveValue { StringValue = (string)arg });
}
else
{
var serializer = _serviceProvider.GetRequiredService<ProtobufSerializer>();
var message = serializer.GetMessage(arg);
outputStream.WriteMessage(message);
}
}
outputStream.Flush();

View File

@ -47,7 +47,7 @@ namespace SocketsSample
app.UseRpc(invocationAdapters =>
{
invocationAdapters.AddInvocationAdapter("protobuf", new Protobuf.ProtobufInvocationAdapter(app.ApplicationServices));
invocationAdapters.AddInvocationAdapter("json", new JSonInvocationAdapter());
invocationAdapters.AddInvocationAdapter("json", new JsonInvocationAdapter());
invocationAdapters.AddInvocationAdapter("line", new LineInvocationAdapter());
});
}

View File

@ -37,7 +37,6 @@ namespace Microsoft.AspNetCore.Builder
public void MapSocketEndpoint<TEndPoint>(string path) where TEndPoint : EndPoint
{
_routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.Execute<TEndPoint>(path, c)));
}
}
}