integrate Channels-based WebSockets into SignalR (#28)

This commit is contained in:
Andrew Stanton-Nurse 2016-11-15 15:05:45 -08:00 committed by GitHub
parent 5e2b267d9f
commit 2431c5925c
9 changed files with 128 additions and 108 deletions

2
.gitignore vendored
View File

@ -37,3 +37,5 @@ node_modules/
autobahnreports/
signalr-client.js
site.min.css
.idea/
.vscode/

View File

@ -1,6 +1,3 @@
{
"projects": [ "src", "test" ],
"sdk": {
"version": "1.0.0-preview2-003121"
}
"projects": [ "src", "test" ]
}

View File

@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
namespace Microsoft.AspNetCore.Sockets
@ -15,11 +16,13 @@ namespace Microsoft.AspNetCore.Sockets
{
private readonly ConnectionManager _manager;
private readonly ChannelFactory _channelFactory;
private readonly ILoggerFactory _loggerFactory;
public HttpConnectionDispatcher(ConnectionManager manager, ChannelFactory factory)
public HttpConnectionDispatcher(ConnectionManager manager, ChannelFactory factory, ILoggerFactory loggerFactory)
{
_manager = manager;
_channelFactory = factory;
_loggerFactory = loggerFactory;
}
public async Task ExecuteAsync<TEndPoint>(string path, HttpContext context) where TEndPoint : EndPoint
@ -72,7 +75,7 @@ namespace Microsoft.AspNetCore.Sockets
var formatType = (string)context.Request.Query["formatType"];
state.Connection.Metadata["formatType"] = string.IsNullOrEmpty(formatType) ? "json" : formatType;
var ws = new WebSockets(state.Connection, format);
var ws = new WebSockets(state.Connection, format, _loggerFactory);
await DoPersistentConnection(endpoint, ws, context, state.Connection);

View File

@ -8,6 +8,7 @@ using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Builder
{
@ -18,7 +19,8 @@ namespace Microsoft.AspNetCore.Builder
var manager = new ConnectionManager();
var factory = new ChannelFactory();
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var loggerFactory = app.ApplicationServices.GetService<ILoggerFactory>();
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory);
// Dispose the connection manager when application shutdown is triggered
var lifetime = app.ApplicationServices.GetRequiredService<IApplicationLifetime>();
@ -28,8 +30,7 @@ namespace Microsoft.AspNetCore.Builder
callback(new SocketRouteBuilder(routes, dispatcher));
// TODO: Use new low allocating websocket API
app.UseWebSockets();
app.UseWebSocketConnections();
app.UseRouter(routes.Build());
return app;
}

View File

@ -2,143 +2,155 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.WebSockets.Internal;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.WebSockets.Internal;
namespace Microsoft.AspNetCore.Sockets
{
public class WebSockets : IHttpTransport
{
private readonly HttpChannel _channel;
private readonly Connection _connection;
private readonly WebSocketMessageType _messageType;
private static readonly TimeSpan _closeTimeout = TimeSpan.FromSeconds(5);
private static readonly WebSocketAcceptContext EmptyContext = new WebSocketAcceptContext();
public WebSockets(Connection connection, Format format)
private readonly HttpChannel _channel;
private readonly WebSocketOpcode _opcode;
private readonly ILogger _logger;
public WebSockets(Connection connection, Format format, ILoggerFactory loggerFactory)
{
_connection = connection;
_channel = (HttpChannel)connection.Channel;
_messageType = format == Format.Binary ? WebSocketMessageType.Binary : WebSocketMessageType.Text;
_opcode = format == Format.Binary ? WebSocketOpcode.Binary : WebSocketOpcode.Text;
_logger = (ILogger)loggerFactory?.CreateLogger<WebSockets>() ?? NullLogger.Instance;
}
public async Task ProcessRequestAsync(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest)
var feature = context.Features.Get<IHttpWebSocketConnectionFeature>();
if (feature == null || !feature.IsWebSocketRequest)
{
_logger.LogWarning("Unable to handle WebSocket request, there is no WebSocket feature available.");
return;
}
var ws = await context.WebSockets.AcceptWebSocketAsync();
// REVIEW: Should we track this task? Leaving things like this alive usually causes memory leaks :)
// The reason we don't await this is because the channel is disposed after this loop returns
// and the sending loop is waiting for the channel to end before doing anything
// We could do a 2 stage shutdown but that could complicate the code...
var sending = StartSending(ws);
var outputBuffer = _channel.Input.Alloc();
while (!_channel.Input.Writing.IsCompleted)
using (var ws = await feature.AcceptWebSocketConnectionAsync(EmptyContext))
{
// Make sure there's room to read (at least 2k)
outputBuffer.Ensure(2048);
_logger.LogInformation("Socket opened.");
ArraySegment<byte> segment;
if (!outputBuffer.Memory.TryGetArray(out segment))
// Begin sending and receiving. Receiving must be started first because ExecuteAsync enables SendAsync.
var receiving = ws.ExecuteAsync((frame, self) => ((WebSockets)self).HandleFrame(frame), this);
var sending = StartSending(ws);
// Wait for something to shut down.
var trigger = await Task.WhenAny(
receiving,
sending);
// What happened?
if (trigger == receiving)
{
// REVIEW: Do we care about native buffers here?
throw new InvalidOperationException("Managed buffers are required for Web Socket API");
}
// Shutting down because we received a close frame from the client.
// Complete the input writer so that the application knows there won't be any more input.
_logger.LogDebug("Client closed connection with status code '{0}' ({1}). Signaling end-of-input to application", receiving.Result.Status, receiving.Result.Description);
_channel.Input.CompleteWriter();
var result = await ws.ReceiveAsync(segment, CancellationToken.None);
// Wait for the application to finish sending.
_logger.LogDebug("Waiting for the application to finish sending data");
await sending;
if (result.MessageType != WebSocketMessageType.Close)
{
outputBuffer.Advance(result.Count);
// Flush the written data to the channel
await outputBuffer.FlushAsync();
// Allocate a new buffer to further writing
outputBuffer = _channel.Input.Alloc();
// Send the server's close frame
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure);
}
else
{
break;
// The application finished sending. We're not going to keep the connection open,
// so close it and wait for the client to ack the close
_channel.Input.CompleteWriter();
_logger.LogDebug("Application finished sending. Sending close frame.");
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure);
_logger.LogDebug("Waiting for the client to close the socket");
// TODO: Timeout.
await receiving;
}
}
await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
_logger.LogInformation("Socket closed.");
}
private async Task StartSending(WebSocket ws)
private Task HandleFrame(WebSocketFrame frame)
{
while (true)
// Is this a frame we care about?
if (!frame.Opcode.IsMessage())
{
var result = await _channel.Output.ReadAsync();
var buffer = result.Buffer;
try
{
if (buffer.IsEmpty && result.IsCompleted)
{
break;
}
foreach (var memory in buffer)
{
ArraySegment<byte> data;
if (memory.TryGetArray(out data))
{
if (IsClosedOrClosedSent(ws))
{
break;
}
await ws.SendAsync(data, _messageType, endOfMessage: true, cancellationToken: CancellationToken.None);
}
}
}
catch (Exception)
{
// Error writing, probably closed
break;
}
finally
{
_channel.Output.Advance(buffer.End);
}
return TaskCache.CompletedTask;
}
// REVIEW: Should this ever happen?
if (!IsClosedOrClosedSent(ws))
{
// Close the output
await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
}
LogFrame("Receiving", frame);
// Allocate space from the input channel
var outputBuffer = _channel.Input.Alloc();
// Append this buffer to the input channel
_logger.LogDebug($"Appending {frame.Payload.Length} bytes to Connection channel");
outputBuffer.Append(frame.Payload);
return outputBuffer.FlushAsync();
}
private static bool IsClosedOrClosedSent(WebSocket webSocket)
private void LogFrame(string action, WebSocketFrame frame)
{
var webSocketState = GetWebSocketState(webSocket);
return webSocketState == WebSocketState.Closed ||
webSocketState == WebSocketState.CloseSent ||
webSocketState == WebSocketState.Aborted;
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(
$"{action} frame: Opcode={frame.Opcode}, Fin={frame.EndOfMessage}, Payload={frame.Payload.Length} bytes");
}
}
private static WebSocketState GetWebSocketState(WebSocket webSocket)
private async Task StartSending(IWebSocketConnection ws)
{
try
{
return webSocket.State;
while (true)
{
var result = await _channel.Output.ReadAsync();
var buffer = result.Buffer;
try
{
if (buffer.IsEmpty && result.IsCompleted)
{
break;
}
// Send the buffer in a frame
var frame = new WebSocketFrame(
endOfMessage: true,
opcode: _opcode,
payload: buffer);
LogFrame("Sending", frame);
await ws.SendAsync(frame);
}
catch (Exception ex)
{
_logger.LogError("Error writing frame to output: {0}", ex);
break;
}
finally
{
_channel.Output.Advance(buffer.End);
}
}
}
catch (ObjectDisposedException)
finally
{
return WebSocketState.Closed;
// No longer reading from the channel
_channel.Output.CompleteReader();
}
}
}

View File

@ -25,7 +25,11 @@
"Channels": "0.2.0-beta-*",
"Microsoft.AspNetCore.Hosting.Abstractions": "1.2.0-*",
"Microsoft.AspNetCore.Routing": "1.2.0-*",
"Microsoft.AspNetCore.WebSockets": "1.1.0-*",
"Microsoft.AspNetCore.WebSockets.Internal": "0.1.0-*",
"Microsoft.Extensions.TaskCache.Sources": {
"version": "1.2.0-*",
"type": "build"
},
"NETStandard.Library": "1.6.1-*"
},
"frameworks": {

View File

@ -34,7 +34,6 @@ namespace Microsoft.AspNetCore.Builder
{
throw new ArgumentNullException(nameof(options));
}
self.UseWebSocketConnections(channelFactory, options);
self.UseMiddleware<WebSocketConnectionMiddleware>(channelFactory, options);
}
}

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
var ms = new MemoryStream();
context.Request.Path = "/getid";
@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var factory = new ChannelFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
context.Request.Path = "/send";
var values = new Dictionary<string, StringValues>();
@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
context.Request.Path = "/send";
var values = new Dictionary<string, StringValues>();
@ -86,7 +86,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
context.Request.Path = "/send";
await Assert.ThrowsAsync<InvalidOperationException>(async () =>

View File

@ -1,4 +1,4 @@
{
{
"buildOptions": {
"warningsAsErrors": true
},
@ -7,6 +7,8 @@
"dotnet-test-xunit": "2.2.0-*",
"Microsoft.AspNetCore.Http": "1.2.0-*",
"Microsoft.AspNetCore.Sockets": "0.1.0-*",
"Microsoft.AspNetCore.Hosting": "1.1.0-*",
"Microsoft.AspNetCore.Server.Kestrel": "1.1.0-*",
"xunit": "2.2.0-*"
},
"frameworks": {