Small fixes

- Pass the Connection to each IHttpTransport
- FlushAsync after writing for WebListener
- Add metadata to Connection
- Added WebListener dependency
This commit is contained in:
David Fowler 2016-10-01 11:52:50 -07:00
parent 9f5ef70164
commit 2a369f40f6
7 changed files with 57 additions and 25 deletions

View File

@ -10,6 +10,7 @@
"Microsoft.AspNetCore.Diagnostics": "1.1.0-*",
"Microsoft.AspNetCore.StaticFiles": "1.1.0-*",
"Microsoft.AspNetCore.Server.IISIntegration": "1.1.0-*",
"Microsoft.AspNetCore.Server.WebListener": "0.1.0",
"Microsoft.AspNetCore.Server.Kestrel": "1.1.0-*",
"Microsoft.Extensions.Logging.Console": "1.1.0-*"
},

View File

@ -10,5 +10,6 @@ namespace Microsoft.AspNetCore.Sockets
{
public string ConnectionId { get; set; }
public IChannel Channel { get; set; }
public IDictionary<string, string> Metadata { get; } = new Dictionary<string, string>();
}
}

View File

@ -52,8 +52,7 @@ namespace Microsoft.AspNetCore.Sockets
{
// Get the connection state for the current http context
var connectionState = GetOrCreateConnection(context);
var channel = (HttpChannel)connectionState.Connection.Channel;
var sse = new ServerSentEvents(channel);
var sse = new ServerSentEvents(connectionState.Connection);
// Register this transport for disconnect
RegisterDisconnect(context, sse);
@ -76,8 +75,7 @@ namespace Microsoft.AspNetCore.Sockets
{
// Get the connection state for the current http context
var connectionState = GetOrCreateConnection(context);
var channel = (HttpChannel)connectionState.Connection.Channel;
var ws = new WebSockets(channel);
var ws = new WebSockets(connectionState.Connection);
// Register this transport for disconnect
RegisterDisconnect(context, ws);
@ -128,8 +126,7 @@ namespace Microsoft.AspNetCore.Sockets
var ignore = endpoint.OnConnected(connectionState.Connection);
}
var channel = (HttpChannel)connectionState.Connection.Channel;
var longPolling = new LongPolling(channel);
var longPolling = new LongPolling(connectionState.Connection);
// Register this transport for disconnect
RegisterDisconnect(context, longPolling);
@ -144,7 +141,7 @@ namespace Microsoft.AspNetCore.Sockets
private static void RegisterDisconnect(HttpContext context, IHttpTransport transport)
{
context.RequestAborted.Register(state => ((IHttpTransport)state).Abort(), transport);
context.RequestAborted.Register(state => ((IHttpTransport)state).CloseAsync(), transport);
}
private Task ProcessGetId(HttpContext context)

View File

@ -8,7 +8,17 @@ namespace Microsoft.AspNetCore.Sockets
{
public interface IHttpTransport
{
/// <summary>
/// Executes the transport
/// </summary>
/// <param name="context"></param>
/// <returns>A <see cref="Task"/> that completes when the transport has finished processing</returns>
Task ProcessRequest(HttpContext context);
void Abort();
/// <summary>
/// Completes the Task returned from ProcessRequest if not already complete
/// </summary>
/// <returns></returns>
Task CloseAsync();
}
}

View File

@ -10,14 +10,16 @@ namespace Microsoft.AspNetCore.Sockets
private readonly TaskCompletionSource<object> _initTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<object> _lifetime = new TaskCompletionSource<object>();
private readonly HttpChannel _channel;
private readonly Connection _connection;
private readonly TaskQueue _queue;
private HttpContext _context;
public LongPolling(HttpChannel channel)
public LongPolling(Connection connection)
{
_queue = new TaskQueue(_initTcs.Task);
_channel = channel;
_connection = connection;
_channel = (HttpChannel)connection.Channel;
}
public async Task ProcessRequest(HttpContext context)
@ -38,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets
if (buffer.IsEmpty && _channel.Output.Reading.IsCompleted)
{
Abort();
await CloseAsync();
return;
}
@ -51,11 +53,24 @@ namespace Microsoft.AspNetCore.Sockets
_channel.Output.Advance(buffer.End);
}
Abort();
await EndRequest();
}
public async void Abort()
public async Task CloseAsync()
{
await _queue.Enqueue(state =>
{
var context = (HttpContext)state;
// REVIEW: What happens if header was already?
context.Response.Headers["X-ASPNET-SOCKET-DISCONNECT"] = "1";
return Task.CompletedTask;
},
_context);
await EndRequest();
}
private async Task EndRequest()
{
// Drain the queue and don't let any new work enter
await _queue.Drain();
@ -66,6 +81,7 @@ namespace Microsoft.AspNetCore.Sockets
private Task Send(ReadableBuffer value)
{
// REVIEW: Can we avoid the closure here?
return _queue.Enqueue(state =>
{
var data = (ReadableBuffer)state;

View File

@ -7,17 +7,19 @@ namespace Microsoft.AspNetCore.Sockets
{
public class ServerSentEvents : IHttpTransport
{
private readonly TaskQueue _queue;
private readonly TaskCompletionSource<object> _initTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<object> _lifetime = new TaskCompletionSource<object>();
private readonly HttpChannel _channel;
private readonly Connection _connection;
private readonly TaskQueue _queue;
private HttpContext _context;
public ServerSentEvents(HttpChannel channel)
public ServerSentEvents(Connection connection)
{
_queue = new TaskQueue(_initTcs.Task);
_channel = channel;
_connection = connection;
_channel = (HttpChannel)connection.Channel;
var ignore = StartSending();
}
@ -34,7 +36,7 @@ namespace Microsoft.AspNetCore.Sockets
await _lifetime.Task;
}
public async void Abort()
public async Task CloseAsync()
{
// Drain the queue so no new work can enter
await _queue.Drain();
@ -66,7 +68,7 @@ namespace Microsoft.AspNetCore.Sockets
private Task Send(ReadableBuffer value)
{
return _queue.Enqueue(state =>
return _queue.Enqueue(async state =>
{
var data = (ReadableBuffer)state;
// TODO: Pooled buffers
@ -83,7 +85,8 @@ namespace Microsoft.AspNetCore.Sockets
at += data.Length;
buffer[at++] = (byte)'\n';
buffer[at++] = (byte)'\n';
return _context.Response.Body.WriteAsync(buffer, 0, at);
await _context.Response.Body.WriteAsync(buffer, 0, at);
await _context.Response.Body.FlushAsync();
},
value);
}

View File

@ -10,12 +10,14 @@ namespace Microsoft.AspNetCore.Sockets
public class WebSockets : IHttpTransport
{
private readonly HttpChannel _channel;
private readonly Connection _connection;
private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
private WebSocket _ws;
public WebSockets(HttpChannel channel)
public WebSockets(Connection connection)
{
_channel = channel;
_connection = connection;
_channel = (HttpChannel)connection.Channel;
var ignore = StartSending();
}
@ -34,7 +36,8 @@ namespace Microsoft.AspNetCore.Sockets
_tcs.TrySetResult(null);
var buffer = new byte[2048];
while (true)
while (!_channel.Input.Writing.IsCompleted)
{
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
@ -49,17 +52,18 @@ namespace Microsoft.AspNetCore.Sockets
}
else if (result.MessageType == WebSocketMessageType.Close)
{
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
// TODO: needs to remove itself from connection mamanger?
break;
}
}
}
public async void Abort()
public async Task CloseAsync()
{
await _tcs.Task;
// REVIEW: Close output vs Close?
await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
}