From e018fe70f760a0d49daa3734c51eeac587672d33 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 2 Oct 2016 23:41:06 -0700 Subject: [PATCH] Handle message fragments --- .../WebSockets.cs | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.AspNetCore.Sockets/WebSockets.cs b/src/Microsoft.AspNetCore.Sockets/WebSockets.cs index 79e1c7c2ed..0ff34a5fc7 100644 --- a/src/Microsoft.AspNetCore.Sockets/WebSockets.cs +++ b/src/Microsoft.AspNetCore.Sockets/WebSockets.cs @@ -35,25 +35,38 @@ namespace Microsoft.AspNetCore.Sockets _tcs.TrySetResult(null); - var buffer = new byte[2048]; + var outputBuffer = _channel.Input.Alloc(); while (!_channel.Input.Writing.IsCompleted) { - var result = await ws.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + // Make sure there's room to read (at least 2k) + outputBuffer.Ensure(2048); - // TODO: Fragments - if (result.MessageType == WebSocketMessageType.Text) + ArraySegment segment; + if (!outputBuffer.Memory.TryGetArray(out segment)) { - await _channel.Input.WriteAsync(new Span(buffer, 0, result.Count)); + // REVIEW: Do we care about native buffers here? + throw new InvalidOperationException("Managed buffers are required for Web Socket API"); } - else if (result.MessageType == WebSocketMessageType.Binary) + + var result = await ws.ReceiveAsync(segment, CancellationToken.None); + + if (result.MessageType != WebSocketMessageType.Close) { - await _channel.Input.WriteAsync(new Span(buffer, 0, result.Count)); + outputBuffer.Advance(result.Count); + + if (result.EndOfMessage) + { + // Flush when we get an entire message + await outputBuffer.FlushAsync(); + + // Allocate a new buffer to further writing + outputBuffer = _channel.Input.Alloc(); + } } - else if (result.MessageType == WebSocketMessageType.Close) + else { await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); - // TODO: needs to remove itself from connection mamanger? break; } }