@davidfowl love dat Func<...,object,...>,object pattern

This commit is contained in:
Andrew Stanton-Nurse 2016-10-14 15:09:06 -07:00
parent a1c0970222
commit d2dbd473a0
3 changed files with 31 additions and 10 deletions

View File

@ -48,8 +48,9 @@ namespace Microsoft.Extensions.WebSockets
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <param name="state">A state parameter that will be passed to each invocation of <paramref name="messageHandler"/></param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, Task> messageHandler);
Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, object, Task> messageHandler, object state);
}
public static class WebSocketConnectionExtensions
@ -74,9 +75,28 @@ namespace Microsoft.Extensions.WebSockets
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Action<WebSocketFrame> messageHandler) =>
self.ExecuteAsync(frame => {
self.ExecuteAsync((frame, _) => {
messageHandler(frame);
return Task.CompletedTask;
});
}, null);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Action<WebSocketFrame, object> messageHandler, object state) =>
self.ExecuteAsync((frame, s) => {
messageHandler(frame, s);
return Task.CompletedTask;
}, state);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Func<WebSocketFrame, Task> messageHandler) =>
self.ExecuteAsync((frame, _) => messageHandler(frame), null);
}
}

View File

@ -34,13 +34,13 @@ namespace Microsoft.Extensions.WebSockets
public static bool TryParse(ReadableBuffer payload, out WebSocketCloseResult result)
{
if(payload.Length == 0)
if (payload.Length == 0)
{
// Empty payload is OK
result = new WebSocketCloseResult(WebSocketCloseStatus.Empty, string.Empty);
return true;
}
else if(payload.Length < 2)
else if (payload.Length < 2)
{
result = default(WebSocketCloseResult);
return false;
@ -50,7 +50,7 @@ namespace Microsoft.Extensions.WebSockets
var status = payload.ReadBigEndian<ushort>();
var description = string.Empty;
payload = payload.Slice(2);
if(payload.Length > 0)
if (payload.Length > 0)
{
description = payload.GetUtf8String();
}

View File

@ -79,7 +79,7 @@ namespace Microsoft.Extensions.WebSockets
_terminateReceiveCts.Cancel();
}
public Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, Task> messageHandler)
public Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, object, Task> messageHandler, object state)
{
if (State == WebSocketConnectionState.Closed)
{
@ -91,7 +91,7 @@ namespace Microsoft.Extensions.WebSockets
throw new InvalidOperationException("Connection is already running.");
}
State = WebSocketConnectionState.Connected;
return Task.Run(() => ReceiveLoop(messageHandler, _terminateReceiveCts.Token));
return ReceiveLoop(messageHandler, state, _terminateReceiveCts.Token);
}
/// <summary>
@ -177,7 +177,7 @@ namespace Microsoft.Extensions.WebSockets
buffer.Set(_maskingKey);
}
private async Task<WebSocketCloseResult> ReceiveLoop(Func<WebSocketFrame, Task> messageHandler, CancellationToken cancellationToken)
private async Task<WebSocketCloseResult> ReceiveLoop(Func<WebSocketFrame, object, Task> messageHandler, object state, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
@ -309,7 +309,7 @@ namespace Microsoft.Extensions.WebSockets
}
else
{
await messageHandler(frame);
await messageHandler(frame, state);
}
// Mark the payload as consumed
@ -376,6 +376,7 @@ namespace Microsoft.Extensions.WebSockets
// Allocate a buffer
var buffer = _outbound.Alloc(minimumSize: allocSize);
Debug.Assert(buffer.Memory.Length >= allocSize);
if (buffer.Memory.Length < allocSize)
{
throw new InvalidOperationException("Couldn't allocate enough data from the channel to write the header");