From 9767dbd5c130f65b24c9117c0317be79002410ac Mon Sep 17 00:00:00 2001 From: Andrew Stanton-Nurse Date: Wed, 22 Feb 2017 11:50:37 -0800 Subject: [PATCH] fix #209 by converting to byte[] (#229) * fix #209 by converting to byte[] --- .../PersistentConnectionLifeTimeManager.cs | 3 +- .../SocialWeather/SocialWeatherEndPoint.cs | 4 +- .../EndPoints/MessagesEndPoint.cs | 13 ++-- .../RedisHubLifetimeManager.cs | 3 +- .../DefaultHubLifetimeManager.cs | 3 +- .../HubEndPoint.cs | 15 ++--- .../Connection.cs | 13 ++-- .../LongPollingTransport.cs | 17 ++--- .../ReadableBufferContent.cs | 29 --------- .../WebSocketsTransport.cs | 38 +++++------ .../Formatters/BinaryMessageFormatter.cs | 8 +-- .../ServerSentEventsMessageFormatter.cs | 64 +++++++++++++------ .../Formatters/TextMessageFormatter.cs | 25 +++----- .../Message.cs | 18 +++--- .../HttpConnectionDispatcher.cs | 4 +- .../Transports/LongPollingTransport.cs | 11 ++-- .../Transports/ServerSentEventsTransport.cs | 18 ++---- .../Transports/WebSocketsTransport.cs | 41 ++++++------ .../TestClient.cs | 10 +-- .../LongPollingTransportTests.cs | 1 - .../Formatters/BinaryMessageFormatterTests.cs | 2 +- .../ServerSentEventsMessageFormatterTests.cs | 2 +- .../Formatters/TextMessageFormatterTests.cs | 3 +- .../MessageTestUtils.cs | 10 +-- .../HttpConnectionDispatcherTests.cs | 8 +-- .../LongPollingTests.cs | 4 +- .../ServerSentEventsTests.cs | 4 +- .../WebSocketsTests.cs | 46 ++++++------- 28 files changed, 186 insertions(+), 231 deletions(-) delete mode 100644 src/Microsoft.AspNetCore.Sockets.Client/ReadableBufferContent.cs diff --git a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs index 5eab9266e4..a46cdcebf1 100644 --- a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs +++ b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs @@ -38,8 +38,7 @@ namespace SocialWeather var formatter = _formatterResolver.GetFormatter(connection.Metadata.Get("formatType")); var ms = new MemoryStream(); await formatter.WriteAsync(data, ms); - var buffer = ReadableBuffer.Create(ms.ToArray()).Preserve(); - await connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true)); + await connection.Transport.Output.WriteAsync(new Message(ms.ToArray(), MessageType.Binary, endOfMessage: true)); } } diff --git a/samples/SocialWeather/SocialWeatherEndPoint.cs b/samples/SocialWeather/SocialWeatherEndPoint.cs index 659f2aea7f..14cf854e8a 100644 --- a/samples/SocialWeather/SocialWeatherEndPoint.cs +++ b/samples/SocialWeather/SocialWeatherEndPoint.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO; @@ -39,7 +39,7 @@ namespace SocialWeather { Message message = await connection.Transport.Input.ReadAsync(); var stream = new MemoryStream(); - await message.Payload.Buffer.CopyToAsync(stream); + await stream.WriteAsync(message.Payload, 0, message.Payload.Length); WeatherReport weatherReport = await formatter.ReadAsync(stream); await _lifetimeManager.SendToAllAsync(weatherReport); } diff --git a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs index cfa827a6fa..643f9a9c4b 100644 --- a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs +++ b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs @@ -26,11 +26,8 @@ namespace SocketsSample.EndPoints Message message; if (connection.Transport.Input.TryRead(out message)) { - using (message) - { - // We can avoid the copy here but we'll deal with that later - await Broadcast(message.Payload.Buffer, message.Type, message.EndOfMessage); - } + // We can avoid the copy here but we'll deal with that later + await Broadcast(message.Payload, message.Type, message.EndOfMessage); } } } @@ -44,17 +41,17 @@ namespace SocketsSample.EndPoints private Task Broadcast(string text) { - return Broadcast(ReadableBuffer.Create(Encoding.UTF8.GetBytes(text)), MessageType.Text, endOfMessage: true); + return Broadcast(Encoding.UTF8.GetBytes(text), MessageType.Text, endOfMessage: true); } - private Task Broadcast(ReadableBuffer payload, MessageType format, bool endOfMessage) + private Task Broadcast(byte[] payload, MessageType format, bool endOfMessage) { var tasks = new List(Connections.Count); foreach (var c in Connections) { tasks.Add(c.Transport.Output.WriteAsync(new Message( - payload.Preserve(), + payload, format, endOfMessage))); } diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 3a8af780b4..d744de4130 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -277,8 +277,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis private async Task WriteAsync(Connection connection, byte[] data) { - var buffer = ReadableBuffer.Create(data).Preserve(); - var message = new Message(buffer, MessageType.Text, endOfMessage: true); + var message = new Message(data, MessageType.Text, endOfMessage: true); while (await connection.Transport.Output.WaitToWriteAsync()) { diff --git a/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs index 1502876315..a2842441f0 100644 --- a/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR/DefaultHubLifetimeManager.cs @@ -129,8 +129,7 @@ namespace Microsoft.AspNetCore.SignalR var stream = new MemoryStream(); await invocationAdapter.WriteMessageAsync(invocation, stream); - var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve(); - var message = new Message(buffer, MessageType.Text, endOfMessage: true); + var message = new Message(stream.ToArray(), MessageType.Text, endOfMessage: true); while (await connection.Transport.Output.WaitToWriteAsync()) { diff --git a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs index d3faf14bfc..d7becbb1dd 100644 --- a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs +++ b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs @@ -151,17 +151,13 @@ namespace Microsoft.AspNetCore.SignalR { while (await connection.Transport.Input.WaitToReadAsync(cts.Token)) { - Message incomingMessage; - while (connection.Transport.Input.TryRead(out incomingMessage)) + while (connection.Transport.Input.TryRead(out var incomingMessage)) { InvocationDescriptor invocationDescriptor; - using (incomingMessage) - { - var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray()); + var inputStream = new MemoryStream(incomingMessage.Payload); - // TODO: Handle receiving InvocationResultDescriptor - invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor; - } + // TODO: Handle receiving InvocationResultDescriptor + invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor; // Is there a better way of detecting that a connection was closed? if (invocationDescriptor == null) @@ -233,8 +229,7 @@ namespace Microsoft.AspNetCore.SignalR var outStream = new MemoryStream(); await invocationAdapter.WriteMessageAsync(result, outStream); - var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve(); - var outMessage = new Message(buffer, MessageType.Text, endOfMessage: true); + var outMessage = new Message(outStream.ToArray(), MessageType.Text, endOfMessage: true); while (await connection.Transport.Output.WaitToWriteAsync()) { diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index 1a557275d0..e03e6ca343 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -159,14 +159,11 @@ namespace Microsoft.AspNetCore.Sockets.Client { if (Input.TryRead(out Message message)) { - using (message) + // Do not "simplify" - events can be removed from a different thread + var receivedEventHandler = Received; + if (receivedEventHandler != null) { - // Do not "simplify" - events can be removed from a different thread - var receivedEventHandler = Received; - if (receivedEventHandler != null) - { - receivedEventHandler(message.Payload.Buffer.ToArray(), message.Type); - } + receivedEventHandler(message.Payload, message.Type); } } } @@ -199,7 +196,7 @@ namespace Microsoft.AspNetCore.Sockets.Client return false; } - var message = new Message(ReadableBuffer.Create(data).Preserve(), type); + var message = new Message(data, type); while (await Output.WaitToWriteAsync(cancellationToken)) { diff --git a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs index 6be53030db..c223dcbc5e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs @@ -85,7 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { var ms = new MemoryStream(); await response.Content.CopyToAsync(ms); - var message = new Message(ReadableBuffer.Create(ms.ToArray()).Preserve(), MessageType.Text); + var message = new Message(ms.ToArray(), MessageType.Text); while (await _application.Output.WaitToWriteAsync(cancellationToken)) { @@ -121,15 +121,16 @@ namespace Microsoft.AspNetCore.Sockets.Client { while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out Message message)) { - using (message) - { - var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); - request.Headers.UserAgent.Add(DefaultUserAgentHeader); - request.Content = new ReadableBufferContent(message.Payload.Buffer); + var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); + request.Headers.UserAgent.Add(DefaultUserAgentHeader); - var response = await _httpClient.SendAsync(request); - response.EnsureSuccessStatusCode(); + if (message.Payload != null && message.Payload.Length > 0) + { + request.Content = new ByteArrayContent(message.Payload); } + + var response = await _httpClient.SendAsync(request); + response.EnsureSuccessStatusCode(); } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client/ReadableBufferContent.cs b/src/Microsoft.AspNetCore.Sockets.Client/ReadableBufferContent.cs deleted file mode 100644 index f4171f5220..0000000000 --- a/src/Microsoft.AspNetCore.Sockets.Client/ReadableBufferContent.cs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System.IO; -using System.IO.Pipelines; -using System.Net; -using System.Net.Http; -using System.Threading.Tasks; - -namespace Microsoft.AspNetCore.Sockets.Client -{ - internal class ReadableBufferContent : HttpContent - { - private ReadableBuffer _buffer; - - public ReadableBufferContent(ReadableBuffer buffer) - { - _buffer = buffer; - } - - protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) => _buffer.CopyToAsync(stream); - - protected override bool TryComputeLength(out long length) - { - length = _buffer.Length; - return true; - } - } -} diff --git a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs index 2af16767b3..2d490abca0 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs @@ -51,7 +51,8 @@ namespace Microsoft.AspNetCore.Sockets.Client // TODO: Handle TCP connection errors // https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251 - Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => { + Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => + { _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); @@ -71,7 +72,7 @@ namespace Microsoft.AspNetCore.Sockets.Client //Exceptions are handled above where the send and receive tasks are being run. receiveResult = await _webSocket.ReceiveAsync(buffer, cancellationToken); - if(receiveResult.MessageType == WebSocketMessageType.Close) + if (receiveResult.MessageType == WebSocketMessageType.Close) { _application.Output.Complete(); return; @@ -82,7 +83,7 @@ namespace Microsoft.AspNetCore.Sockets.Client } while (!receiveResult.EndOfMessage); //Making sure the message type is either text or binary - Debug.Assert((receiveResult.MessageType == WebSocketMessageType.Binary || receiveResult.MessageType == WebSocketMessageType.Text ), "Unexpected message type"); + Debug.Assert((receiveResult.MessageType == WebSocketMessageType.Binary || receiveResult.MessageType == WebSocketMessageType.Text), "Unexpected message type"); Message message; var messageType = receiveResult.MessageType == WebSocketMessageType.Binary ? MessageType.Binary : MessageType.Text; @@ -90,17 +91,19 @@ namespace Microsoft.AspNetCore.Sockets.Client { var messageBuffer = new byte[totalBytes]; var offset = 0; - for (var i = 0 ; i < incomingMessage.Count; i++) + for (var i = 0; i < incomingMessage.Count; i++) { Buffer.BlockCopy(incomingMessage[i].Array, 0, messageBuffer, offset, incomingMessage[i].Count); offset += incomingMessage[i].Count; } - message = new Message(ReadableBuffer.Create(messageBuffer).Preserve(), messageType, receiveResult.EndOfMessage); + message = new Message(messageBuffer, messageType, receiveResult.EndOfMessage); } else { - message = new Message(ReadableBuffer.Create(incomingMessage[0].Array, incomingMessage[0].Offset, incomingMessage[0].Count).Preserve(), messageType, receiveResult.EndOfMessage); + var buffer = new byte[incomingMessage[0].Count]; + Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, buffer, 0, incomingMessage[0].Count); + message = new Message(buffer, messageType, receiveResult.EndOfMessage); } while (await _application.Output.WaitToWriteAsync(cancellationToken)) @@ -121,20 +124,17 @@ namespace Microsoft.AspNetCore.Sockets.Client Message message; while (_application.Input.TryRead(out message)) { - using (message) + try { - try - { - await _webSocket.SendAsync(new ArraySegment(message.Payload.Buffer.ToArray()), - message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true, - cancellationToken); - } - catch (OperationCanceledException ex) - { - _logger?.LogError(ex.Message); - await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); - break; - } + await _webSocket.SendAsync(new ArraySegment(message.Payload), + message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true, + cancellationToken); + } + catch (OperationCanceledException ex) + { + _logger?.LogError(ex.Message); + await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); + break; } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/BinaryMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/BinaryMessageFormatter.cs index 4150f84005..50f6dbe861 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/BinaryMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/BinaryMessageFormatter.cs @@ -17,14 +17,14 @@ namespace Microsoft.AspNetCore.Sockets.Formatters internal static bool TryFormatMessage(Message message, Span buffer, out int bytesWritten) { // We can check the size needed right up front! - var sizeNeeded = sizeof(long) + 1 + message.Payload.Buffer.Length; + var sizeNeeded = sizeof(long) + 1 + message.Payload.Length; if (buffer.Length < sizeNeeded) { bytesWritten = 0; return false; } - buffer.WriteBigEndian((long)message.Payload.Buffer.Length); + buffer.WriteBigEndian((long)message.Payload.Length); if (!TryFormatType(message.Type, buffer.Slice(sizeof(long), 1))) { bytesWritten = 0; @@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters buffer = buffer.Slice(sizeof(long) + 1); - message.Payload.Buffer.CopyTo(buffer); + message.Payload.CopyTo(buffer); bytesWritten = sizeNeeded; return true; } @@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters var buf = new byte[length]; buffer.Slice(sizeof(long) + 1, length).CopyTo(buf); - message = new Message(ReadableBuffer.Create(buf).Preserve(), messageType, endOfMessage: true); + message = new Message(buf, messageType, endOfMessage: true); bytesConsumed = sizeof(long) + 1 + length; return true; } diff --git a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/ServerSentEventsMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/ServerSentEventsMessageFormatter.cs index f572ff905e..5fd40034a0 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/ServerSentEventsMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/ServerSentEventsMessageFormatter.cs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.IO.Pipelines; using System.Text; namespace Microsoft.AspNetCore.Sockets.Formatters @@ -47,7 +46,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters buffer = buffer.Slice(Newline.Length); // Write the payload - if (!TryFormatPayload(message.Payload.Buffer, message.Type, buffer, out var writtenForPayload)) + if (!TryFormatPayload(message.Payload, message.Type, buffer, out var writtenForPayload)) { bytesWritten = 0; return false; @@ -65,7 +64,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters return true; } - private static bool TryFormatPayload(ReadableBuffer payload, MessageType type, Span buffer, out int bytesWritten) + private static bool TryFormatPayload(ReadOnlySpan payload, MessageType type, Span buffer, out int bytesWritten) { // Short-cut for empty payload if (payload.Length == 0) @@ -98,27 +97,54 @@ namespace Microsoft.AspNetCore.Sockets.Formatters } else { - while (true) + // We can't just use while(payload.Length > 0) because we need to write a blank final "data: " line + // if the payload ends in a newline. For example, consider the following payload: + // "Hello\n" + // It needs to be written as: + // data: Hello\r\n + // data: \r\n + // \r\n + // Since we slice past the newline when we find it, after writing "Hello" in the previous example, we'll + // end up with an empty payload buffer, BUT we need to write it as an empty 'data:' line, so we need + // to use a condition that ensure the only time we stop writing is when we write the slice after the final + // newline. + var keepWriting = true; + while (keepWriting) { // Seek to the end of buffer or newline - var sliced = payload.TrySliceTo(LineFeed, out var slice, out var cursor); + var sliceEnd = payload.IndexOf(LineFeed); + var nextSliceStart = sliceEnd + 1; + if (sliceEnd < 0) + { + sliceEnd = payload.Length; + nextSliceStart = sliceEnd + 1; - if (!TryFormatLine(sliced ? slice : payload, buffer, out var writtenByLine)) + // This is the last span + keepWriting = false; + } + if (sliceEnd > 0 && payload[sliceEnd - 1] == '\r') + { + sliceEnd--; + } + + var slice = payload.Slice(0, sliceEnd); + + if (nextSliceStart >= payload.Length) + { + payload = Span.Empty; + } + else + { + payload = payload.Slice(nextSliceStart); + } + + if (!TryFormatLine(slice, buffer, out var writtenByLine)) { bytesWritten = 0; return false; } buffer = buffer.Slice(writtenByLine); writtenSoFar += writtenByLine; - - if (sliced) - { - payload = payload.Slice(payload.Move(cursor, 1)); - } - else - { - break; - } } } @@ -126,13 +152,13 @@ namespace Microsoft.AspNetCore.Sockets.Formatters return true; } - private static bool TryFormatLine(ReadableBuffer slice, Span buffer, out int bytesWritten) + private static bool TryFormatLine(ReadOnlySpan line, Span buffer, out int bytesWritten) { // We're going to write the whole thing. HOWEVER, if the last byte is a '\r', we want to truncate it // because it was the '\r' in a '\r\n' newline sequence // This won't require an additional byte in the buffer because after this line we have to write a newline sequence anyway. var writtenSoFar = 0; - if (buffer.Length < DataPrefix.Length + slice.Length) + if (buffer.Length < DataPrefix.Length + line.Length) { bytesWritten = 0; return false; @@ -141,8 +167,8 @@ namespace Microsoft.AspNetCore.Sockets.Formatters writtenSoFar += DataPrefix.Length; buffer = buffer.Slice(DataPrefix.Length); - slice.CopyTo(buffer); - var sliceTo = slice.Length; + line.CopyTo(buffer); + var sliceTo = line.Length; if (sliceTo > 0 && buffer[sliceTo - 1] == '\r') { sliceTo -= 1; diff --git a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/TextMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/TextMessageFormatter.cs index 6c324d6f1b..fc5d9b2599 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/Formatters/TextMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/Formatters/TextMessageFormatter.cs @@ -19,10 +19,10 @@ namespace Microsoft.AspNetCore.Sockets.Formatters internal static bool TryFormatMessage(Message message, Span buffer, out int bytesWritten) { // Calculate the length, it's the number of characters for text messages, but number of base64 characters for binary - var length = message.Payload.Buffer.Length; + var length = message.Payload.Length; if (message.Type == MessageType.Binary) { - length = (int)(4 * Math.Ceiling(((double)message.Payload.Buffer.Length / 3))); + length = (int)(4 * Math.Ceiling(((double)message.Payload.Length / 3))); } // Write the length as a string @@ -58,7 +58,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters { // Encode the payload. For now, we make it an array and use the old-fashioned types because we need to mirror packages // I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse - var payload = Convert.ToBase64String(message.Payload.Buffer.ToArray()); + var payload = Convert.ToBase64String(message.Payload); if (!TextEncoder.Utf8.TryEncode(payload, buffer, out int payloadWritten)) { bytesWritten = 0; @@ -69,14 +69,14 @@ namespace Microsoft.AspNetCore.Sockets.Formatters } else { - if (buffer.Length < message.Payload.Buffer.Length) + if (buffer.Length < message.Payload.Length) { bytesWritten = 0; return false; } - message.Payload.Buffer.CopyTo(buffer.Slice(0, message.Payload.Buffer.Length)); - written += message.Payload.Buffer.Length; - buffer = buffer.Slice(message.Payload.Buffer.Length); + message.Payload.CopyTo(buffer.Slice(0, message.Payload.Length)); + written += message.Payload.Length; + buffer = buffer.Slice(message.Payload.Length); } // Terminator @@ -165,14 +165,13 @@ namespace Microsoft.AspNetCore.Sockets.Formatters // Parse the payload. For now, we make it an array and use the old-fashioned types. // I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse - var payloadArray = payloadBuffer.ToArray(); - PreservedBuffer payload; + var payload = payloadBuffer.ToArray(); if (messageType == MessageType.Binary) { byte[] decoded; try { - var str = Encoding.UTF8.GetString(payloadArray); + var str = Encoding.UTF8.GetString(payload); decoded = Convert.FromBase64String(str); } catch @@ -182,11 +181,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters bytesConsumed = 0; return false; } - payload = ReadableBuffer.Create(decoded).Preserve(); - } - else - { - payload = ReadableBuffer.Create(payloadArray).Preserve(); + payload = decoded; } // Verify the trailer diff --git a/src/Microsoft.AspNetCore.Sockets.Common/Message.cs b/src/Microsoft.AspNetCore.Sockets.Common/Message.cs index 2413b5eb3e..c8c8f51fc4 100644 --- a/src/Microsoft.AspNetCore.Sockets.Common/Message.cs +++ b/src/Microsoft.AspNetCore.Sockets.Common/Message.cs @@ -2,32 +2,30 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.IO.Pipelines; namespace Microsoft.AspNetCore.Sockets { - public struct Message : IDisposable + public struct Message { public bool EndOfMessage { get; } public MessageType Type { get; } - public PreservedBuffer Payload { get; } - public Message(PreservedBuffer payload, MessageType type) + // REVIEW: We need a better primitive to use here. Memory would be good, + // but @davidfowl has concerns about allocating OwnedMemory and how to dispose + // it properly + public byte[] Payload { get; } + + public Message(byte[] payload, MessageType type) : this(payload, type, endOfMessage: true) { } - public Message(PreservedBuffer payload, MessageType type, bool endOfMessage) + public Message(byte[] payload, MessageType type, bool endOfMessage) { Type = type; EndOfMessage = endOfMessage; Payload = payload; } - - public void Dispose() - { - Payload.Dispose(); - } } } diff --git a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs index 514c11703b..4f7ca13971 100644 --- a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -331,7 +331,7 @@ namespace Microsoft.AspNetCore.Sockets : MessageType.Text; var message = new Message( - ReadableBuffer.Create(buffer).Preserve(), + buffer, format, endOfMessage: true); diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs index eec753f472..4fe57bb88a 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -37,12 +37,9 @@ namespace Microsoft.AspNetCore.Sockets.Transports Message message; if (_application.TryRead(out message)) { - using (message) - { - _logger.LogDebug("Writing {0} byte message to response", message.Payload.Buffer.Length); - context.Response.ContentLength = message.Payload.Buffer.Length; - await message.Payload.Buffer.CopyToAsync(context.Response.Body); - } + _logger.LogDebug("Writing {0} byte message to response", message.Payload.Length); + context.Response.ContentLength = message.Payload.Length; + await context.Response.Body.WriteAsync(message.Payload, 0, message.Payload.Length); } } catch (OperationCanceledException) diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs index 4863ccc396..0d3c7a13ae 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -33,13 +33,9 @@ namespace Microsoft.AspNetCore.Sockets.Transports { while (await _application.WaitToReadAsync(token)) { - Message message; - while (_application.TryRead(out message)) + while (_application.TryRead(out var message)) { - using (message) - { - await Send(context, message); - } + await Send(context, message); } } } @@ -53,8 +49,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports { // TODO: Pooled buffers // 8 = 6(data: ) + 2 (\n\n) - _logger.LogDebug("Sending {0} byte message to Server-Sent Events client", message.Payload.Buffer.Length); - var buffer = new byte[8 + message.Payload.Buffer.Length]; + _logger.LogDebug("Sending {0} byte message to Server-Sent Events client", message.Payload.Length); + var buffer = new byte[8 + message.Payload.Length]; var at = 0; buffer[at++] = (byte)'d'; buffer[at++] = (byte)'a'; @@ -62,8 +58,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports buffer[at++] = (byte)'a'; buffer[at++] = (byte)':'; buffer[at++] = (byte)' '; - message.Payload.Buffer.CopyTo(new Span(buffer, at, message.Payload.Buffer.Length)); - at += message.Payload.Buffer.Length; + message.Payload.CopyTo(new Span(buffer, at, message.Payload.Length)); + at += message.Payload.Length; buffer[at++] = (byte)'\n'; buffer[at++] = (byte)'\n'; await context.Response.Body.WriteAsync(buffer, 0, at); diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs index 9382612e37..4ec6667d99 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; @@ -137,7 +138,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports } // Create a Message for the frame - var message = new Message(frame.Payload.Preserve(), effectiveOpcode == WebSocketOpcode.Binary ? MessageType.Binary : MessageType.Text, frame.EndOfMessage); + // This has to copy the buffer :(. + var message = new Message(frame.Payload.ToArray(), effectiveOpcode == WebSocketOpcode.Binary ? MessageType.Binary : MessageType.Text, frame.EndOfMessage); // Write the message to the channel return _application.Output.WriteAsync(message); @@ -160,31 +162,28 @@ namespace Microsoft.AspNetCore.Sockets.Transports Message message; while (_application.Input.TryRead(out message)) { - using (message) + if (message.Payload.Length > 0) { - if (message.Payload.Buffer.Length > 0) + try { - try - { - var opcode = message.Type == MessageType.Binary ? - WebSocketOpcode.Binary : - WebSocketOpcode.Text; + var opcode = message.Type == MessageType.Binary ? + WebSocketOpcode.Binary : + WebSocketOpcode.Text; - var frame = new WebSocketFrame( - endOfMessage: message.EndOfMessage, - opcode: _lastFrameIncomplete ? WebSocketOpcode.Continuation : opcode, - payload: message.Payload.Buffer); + var frame = new WebSocketFrame( + endOfMessage: message.EndOfMessage, + opcode: _lastFrameIncomplete ? WebSocketOpcode.Continuation : opcode, + payload: ReadableBuffer.Create(message.Payload)); - _lastFrameIncomplete = !message.EndOfMessage; + _lastFrameIncomplete = !message.EndOfMessage; - LogFrame("Sending", frame); - await ws.SendAsync(frame); - } - catch (Exception ex) - { - _logger.LogError("Error writing frame to output: {0}", ex); - break; - } + LogFrame("Sending", frame); + await ws.SendAsync(frame); + } + catch (Exception ex) + { + _logger.LogError("Error writing frame to output: {0}", ex); + break; } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs index c8ccf0f3f7..cde22d8340 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs @@ -63,8 +63,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests }, stream); - var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve(); - await Application.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true)); + await Application.Output.WriteAsync(new Message(stream.ToArray(), MessageType.Binary, endOfMessage: true)); } public async Task Read() where T : InvocationMessage @@ -87,11 +86,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests Message message; if (Application.Input.TryRead(out message)) { - using (message) - { - var value = await _adapter.ReadMessageAsync(new MemoryStream(message.Payload.Buffer.ToArray()), _binder); - return value as T; - } + var value = await _adapter.ReadMessageAsync(new MemoryStream(message.Payload), _binder); + return value as T; } return null; diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs index 33ca5b2b20..bdc2f0915f 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/LongPollingTransportTests.cs @@ -154,7 +154,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests // The channel needs to be drained for the Completion task to be completed while (transportToConnection.In.TryRead(out Message message)) { - message.Dispose(); } var exception = await Assert.ThrowsAsync(async () => await transportToConnection.In.Completion); diff --git a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/BinaryMessageFormatterTests.cs b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/BinaryMessageFormatterTests.cs index 8af703d686..9a6f4ceeb2 100644 --- a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/BinaryMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/BinaryMessageFormatterTests.cs @@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests [Fact] public void WriteInvalidMessages() { - var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false); + var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false); var ex = Assert.Throws(() => MessageFormatter.TryFormatMessage(message, Span.Empty, MessageFormat.Binary, out var written)); Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message); diff --git a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/ServerSentEventsMessageFormatterTests.cs b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/ServerSentEventsMessageFormatterTests.cs index 05cbd993ba..2d01660d4d 100644 --- a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/ServerSentEventsMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/ServerSentEventsMessageFormatterTests.cs @@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests [Fact] public void WriteInvalidMessages() { - var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false); + var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false); var ex = Assert.Throws(() => ServerSentEventsMessageFormatter.TryFormatMessage(message, Span.Empty, out var written)); Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message); diff --git a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/TextMessageFormatterTests.cs b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/TextMessageFormatterTests.cs index a343cca1e5..06a1e4b5b6 100644 --- a/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/TextMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Common.Tests/Formatters/TextMessageFormatterTests.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.IO.Pipelines; using System.Text; using Microsoft.AspNetCore.Sockets.Tests; using Xunit; @@ -73,7 +72,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests [Fact] public void WriteInvalidMessages() { - var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false); + var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false); var ex = Assert.Throws(() => MessageFormatter.TryFormatMessage(message, Span.Empty, MessageFormat.Text, out var written)); Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message); diff --git a/test/Microsoft.AspNetCore.Sockets.Common.Tests/MessageTestUtils.cs b/test/Microsoft.AspNetCore.Sockets.Common.Tests/MessageTestUtils.cs index 35cc394e47..b4476d5193 100644 --- a/test/Microsoft.AspNetCore.Sockets.Common.Tests/MessageTestUtils.cs +++ b/test/Microsoft.AspNetCore.Sockets.Common.Tests/MessageTestUtils.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO.Pipelines; @@ -13,20 +13,20 @@ namespace Microsoft.AspNetCore.Sockets.Tests { Assert.True(message.EndOfMessage); Assert.Equal(messageType, message.Type); - Assert.Equal(payload, message.Payload.Buffer.ToArray()); + Assert.Equal(payload, message.Payload); } public static void AssertMessage(Message message, MessageType messageType, string payload) { Assert.True(message.EndOfMessage); Assert.Equal(messageType, message.Type); - Assert.Equal(payload, Encoding.UTF8.GetString(message.Payload.Buffer.ToArray())); + Assert.Equal(payload, Encoding.UTF8.GetString(message.Payload)); } public static Message CreateMessage(byte[] payload, MessageType type = MessageType.Binary) { return new Message( - ReadableBuffer.Create(payload).Preserve(), + payload, type, endOfMessage: true); } @@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests public static Message CreateMessage(string payload, MessageType type) { return new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes(payload)).Preserve(), + Encoding.UTF8.GetBytes(payload), type, endOfMessage: true); } diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs index 9463093874..da7b4ff3e9 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -251,7 +251,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var task = dispatcher.ExecuteAsync("", context); - var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(); + var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the transport so the poll yields await state.Connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true)); @@ -276,7 +276,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var task = dispatcher.ExecuteAsync("", context); - var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(); + var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the application await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true)); @@ -301,7 +301,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests var task = dispatcher.ExecuteAsync("", context); - var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(); + var buffer = Encoding.UTF8.GetBytes("Hello World"); // Write to the application await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true)); diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs index 0f69028cd5..119f539ebc 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO; @@ -39,7 +39,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests context.Response.Body = ms; await channel.Out.WriteAsync(new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(), + Encoding.UTF8.GetBytes("Hello World"), MessageType.Text, endOfMessage: true)); diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs index c389bc2100..f66adaf308 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.IO; @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests context.Response.Body = ms; await channel.Out.WriteAsync(new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(), + Encoding.UTF8.GetBytes("Hello World"), MessageType.Text, endOfMessage: true)); diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs index e38542f111..75b92e3514 100644 --- a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs @@ -47,12 +47,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests payload: ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello")))); await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure); - using (var message = await applicationSide.Input.In.ReadAsync()) - { - Assert.True(message.EndOfMessage); - Assert.Equal(format, message.Type); - Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray())); - } + var message = await applicationSide.Input.In.ReadAsync(); + Assert.True(message.EndOfMessage); + Assert.Equal(format, message.Type); + Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload)); Assert.True(applicationSide.Output.Out.TryComplete()); @@ -99,19 +97,15 @@ namespace Microsoft.AspNetCore.Sockets.Tests payload: ReadableBuffer.Create(Encoding.UTF8.GetBytes("World")))); await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure); - using (var message1 = await applicationSide.Input.In.ReadAsync()) - { - Assert.False(message1.EndOfMessage); - Assert.Equal(format, message1.Type); - Assert.Equal("Hello", Encoding.UTF8.GetString(message1.Payload.Buffer.ToArray())); - } + var message1 = await applicationSide.Input.In.ReadAsync(); + Assert.False(message1.EndOfMessage); + Assert.Equal(format, message1.Type); + Assert.Equal("Hello", Encoding.UTF8.GetString(message1.Payload)); - using (var message2 = await applicationSide.Input.In.ReadAsync()) - { - Assert.True(message2.EndOfMessage); - Assert.Equal(format, message2.Type); - Assert.Equal("World", Encoding.UTF8.GetString(message2.Payload.Buffer.ToArray())); - } + var message2 = await applicationSide.Input.In.ReadAsync(); + Assert.True(message2.EndOfMessage); + Assert.Equal(format, message2.Type); + Assert.Equal("World", Encoding.UTF8.GetString(message2.Payload)); Assert.True(applicationSide.Output.Out.TryComplete()); @@ -149,11 +143,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests // Write multi-frame message to the output channel, and then complete it await applicationSide.Output.Out.WriteAsync(new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello")).Preserve(), + Encoding.UTF8.GetBytes("Hello"), format, endOfMessage: false)); await applicationSide.Output.Out.WriteAsync(new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes("World")).Preserve(), + Encoding.UTF8.GetBytes("World"), format, endOfMessage: true)); Assert.True(applicationSide.Output.Out.TryComplete()); @@ -197,7 +191,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests // Write to the output channel, and then complete it await applicationSide.Output.Out.WriteAsync(new Message( - ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello")).Preserve(), + Encoding.UTF8.GetBytes("Hello"), format, endOfMessage: true)); Assert.True(applicationSide.Output.Out.TryComplete()); @@ -248,12 +242,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure); // Read that frame from the input - using (var message = await applicationSide.Input.In.ReadAsync()) - { - Assert.True(message.EndOfMessage); - Assert.Equal(format, message.Type); - Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray())); - } + var message = await applicationSide.Input.In.ReadAsync(); + Assert.True(message.EndOfMessage); + Assert.Equal(format, message.Type); + Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload)); await transport; }