fix #209 by converting to byte[] (#229)

* fix #209 by converting to byte[]
This commit is contained in:
Andrew Stanton-Nurse 2017-02-22 11:50:37 -08:00 committed by GitHub
parent 701612c859
commit 9767dbd5c1
28 changed files with 186 additions and 231 deletions

View File

@ -38,8 +38,7 @@ namespace SocialWeather
var formatter = _formatterResolver.GetFormatter<T>(connection.Metadata.Get<string>("formatType"));
var ms = new MemoryStream();
await formatter.WriteAsync(data, ms);
var buffer = ReadableBuffer.Create(ms.ToArray()).Preserve();
await connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true));
await connection.Transport.Output.WriteAsync(new Message(ms.ToArray(), MessageType.Binary, endOfMessage: true));
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
@ -39,7 +39,7 @@ namespace SocialWeather
{
Message message = await connection.Transport.Input.ReadAsync();
var stream = new MemoryStream();
await message.Payload.Buffer.CopyToAsync(stream);
await stream.WriteAsync(message.Payload, 0, message.Payload.Length);
WeatherReport weatherReport = await formatter.ReadAsync(stream);
await _lifetimeManager.SendToAllAsync(weatherReport);
}

View File

@ -26,11 +26,8 @@ namespace SocketsSample.EndPoints
Message message;
if (connection.Transport.Input.TryRead(out message))
{
using (message)
{
// We can avoid the copy here but we'll deal with that later
await Broadcast(message.Payload.Buffer, message.Type, message.EndOfMessage);
}
// We can avoid the copy here but we'll deal with that later
await Broadcast(message.Payload, message.Type, message.EndOfMessage);
}
}
}
@ -44,17 +41,17 @@ namespace SocketsSample.EndPoints
private Task Broadcast(string text)
{
return Broadcast(ReadableBuffer.Create(Encoding.UTF8.GetBytes(text)), MessageType.Text, endOfMessage: true);
return Broadcast(Encoding.UTF8.GetBytes(text), MessageType.Text, endOfMessage: true);
}
private Task Broadcast(ReadableBuffer payload, MessageType format, bool endOfMessage)
private Task Broadcast(byte[] payload, MessageType format, bool endOfMessage)
{
var tasks = new List<Task>(Connections.Count);
foreach (var c in Connections)
{
tasks.Add(c.Transport.Output.WriteAsync(new Message(
payload.Preserve(),
payload,
format,
endOfMessage)));
}

View File

@ -277,8 +277,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
private async Task WriteAsync(Connection connection, byte[] data)
{
var buffer = ReadableBuffer.Create(data).Preserve();
var message = new Message(buffer, MessageType.Text, endOfMessage: true);
var message = new Message(data, MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -129,8 +129,7 @@ namespace Microsoft.AspNetCore.SignalR
var stream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(invocation, stream);
var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve();
var message = new Message(buffer, MessageType.Text, endOfMessage: true);
var message = new Message(stream.ToArray(), MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -151,17 +151,13 @@ namespace Microsoft.AspNetCore.SignalR
{
while (await connection.Transport.Input.WaitToReadAsync(cts.Token))
{
Message incomingMessage;
while (connection.Transport.Input.TryRead(out incomingMessage))
while (connection.Transport.Input.TryRead(out var incomingMessage))
{
InvocationDescriptor invocationDescriptor;
using (incomingMessage)
{
var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray());
var inputStream = new MemoryStream(incomingMessage.Payload);
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
}
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
// Is there a better way of detecting that a connection was closed?
if (invocationDescriptor == null)
@ -233,8 +229,7 @@ namespace Microsoft.AspNetCore.SignalR
var outStream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(result, outStream);
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
var outMessage = new Message(buffer, MessageType.Text, endOfMessage: true);
var outMessage = new Message(outStream.ToArray(), MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -159,14 +159,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
if (Input.TryRead(out Message message))
{
using (message)
// Do not "simplify" - events can be removed from a different thread
var receivedEventHandler = Received;
if (receivedEventHandler != null)
{
// Do not "simplify" - events can be removed from a different thread
var receivedEventHandler = Received;
if (receivedEventHandler != null)
{
receivedEventHandler(message.Payload.Buffer.ToArray(), message.Type);
}
receivedEventHandler(message.Payload, message.Type);
}
}
}
@ -199,7 +196,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
return false;
}
var message = new Message(ReadableBuffer.Create(data).Preserve(), type);
var message = new Message(data, type);
while (await Output.WaitToWriteAsync(cancellationToken))
{

View File

@ -85,7 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
var ms = new MemoryStream();
await response.Content.CopyToAsync(ms);
var message = new Message(ReadableBuffer.Create(ms.ToArray()).Preserve(), MessageType.Text);
var message = new Message(ms.ToArray(), MessageType.Text);
while (await _application.Output.WaitToWriteAsync(cancellationToken))
{
@ -121,15 +121,16 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out Message message))
{
using (message)
{
var request = new HttpRequestMessage(HttpMethod.Post, sendUrl);
request.Headers.UserAgent.Add(DefaultUserAgentHeader);
request.Content = new ReadableBufferContent(message.Payload.Buffer);
var request = new HttpRequestMessage(HttpMethod.Post, sendUrl);
request.Headers.UserAgent.Add(DefaultUserAgentHeader);
var response = await _httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
if (message.Payload != null && message.Payload.Length > 0)
{
request.Content = new ByteArrayContent(message.Payload);
}
var response = await _httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
}
}
}

View File

@ -1,29 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Sockets.Client
{
internal class ReadableBufferContent : HttpContent
{
private ReadableBuffer _buffer;
public ReadableBufferContent(ReadableBuffer buffer)
{
_buffer = buffer;
}
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) => _buffer.CopyToAsync(stream);
protected override bool TryComputeLength(out long length)
{
length = _buffer.Length;
return true;
}
}
}

View File

@ -51,7 +51,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
// TODO: Handle TCP connection errors
// https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251
Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => {
Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t =>
{
_application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
@ -71,7 +72,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
//Exceptions are handled above where the send and receive tasks are being run.
receiveResult = await _webSocket.ReceiveAsync(buffer, cancellationToken);
if(receiveResult.MessageType == WebSocketMessageType.Close)
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
_application.Output.Complete();
return;
@ -82,7 +83,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
} while (!receiveResult.EndOfMessage);
//Making sure the message type is either text or binary
Debug.Assert((receiveResult.MessageType == WebSocketMessageType.Binary || receiveResult.MessageType == WebSocketMessageType.Text ), "Unexpected message type");
Debug.Assert((receiveResult.MessageType == WebSocketMessageType.Binary || receiveResult.MessageType == WebSocketMessageType.Text), "Unexpected message type");
Message message;
var messageType = receiveResult.MessageType == WebSocketMessageType.Binary ? MessageType.Binary : MessageType.Text;
@ -90,17 +91,19 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
var messageBuffer = new byte[totalBytes];
var offset = 0;
for (var i = 0 ; i < incomingMessage.Count; i++)
for (var i = 0; i < incomingMessage.Count; i++)
{
Buffer.BlockCopy(incomingMessage[i].Array, 0, messageBuffer, offset, incomingMessage[i].Count);
offset += incomingMessage[i].Count;
}
message = new Message(ReadableBuffer.Create(messageBuffer).Preserve(), messageType, receiveResult.EndOfMessage);
message = new Message(messageBuffer, messageType, receiveResult.EndOfMessage);
}
else
{
message = new Message(ReadableBuffer.Create(incomingMessage[0].Array, incomingMessage[0].Offset, incomingMessage[0].Count).Preserve(), messageType, receiveResult.EndOfMessage);
var buffer = new byte[incomingMessage[0].Count];
Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, buffer, 0, incomingMessage[0].Count);
message = new Message(buffer, messageType, receiveResult.EndOfMessage);
}
while (await _application.Output.WaitToWriteAsync(cancellationToken))
@ -121,20 +124,17 @@ namespace Microsoft.AspNetCore.Sockets.Client
Message message;
while (_application.Input.TryRead(out message))
{
using (message)
try
{
try
{
await _webSocket.SendAsync(new ArraySegment<byte>(message.Payload.Buffer.ToArray()),
message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true,
cancellationToken);
}
catch (OperationCanceledException ex)
{
_logger?.LogError(ex.Message);
await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken);
break;
}
await _webSocket.SendAsync(new ArraySegment<byte>(message.Payload),
message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true,
cancellationToken);
}
catch (OperationCanceledException ex)
{
_logger?.LogError(ex.Message);
await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken);
break;
}
}
}

View File

@ -17,14 +17,14 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
internal static bool TryFormatMessage(Message message, Span<byte> buffer, out int bytesWritten)
{
// We can check the size needed right up front!
var sizeNeeded = sizeof(long) + 1 + message.Payload.Buffer.Length;
var sizeNeeded = sizeof(long) + 1 + message.Payload.Length;
if (buffer.Length < sizeNeeded)
{
bytesWritten = 0;
return false;
}
buffer.WriteBigEndian((long)message.Payload.Buffer.Length);
buffer.WriteBigEndian((long)message.Payload.Length);
if (!TryFormatType(message.Type, buffer.Slice(sizeof(long), 1)))
{
bytesWritten = 0;
@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
buffer = buffer.Slice(sizeof(long) + 1);
message.Payload.Buffer.CopyTo(buffer);
message.Payload.CopyTo(buffer);
bytesWritten = sizeNeeded;
return true;
}
@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
var buf = new byte[length];
buffer.Slice(sizeof(long) + 1, length).CopyTo(buf);
message = new Message(ReadableBuffer.Create(buf).Preserve(), messageType, endOfMessage: true);
message = new Message(buf, messageType, endOfMessage: true);
bytesConsumed = sizeof(long) + 1 + length;
return true;
}

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
namespace Microsoft.AspNetCore.Sockets.Formatters
@ -47,7 +46,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
buffer = buffer.Slice(Newline.Length);
// Write the payload
if (!TryFormatPayload(message.Payload.Buffer, message.Type, buffer, out var writtenForPayload))
if (!TryFormatPayload(message.Payload, message.Type, buffer, out var writtenForPayload))
{
bytesWritten = 0;
return false;
@ -65,7 +64,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
return true;
}
private static bool TryFormatPayload(ReadableBuffer payload, MessageType type, Span<byte> buffer, out int bytesWritten)
private static bool TryFormatPayload(ReadOnlySpan<byte> payload, MessageType type, Span<byte> buffer, out int bytesWritten)
{
// Short-cut for empty payload
if (payload.Length == 0)
@ -98,27 +97,54 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
}
else
{
while (true)
// We can't just use while(payload.Length > 0) because we need to write a blank final "data: " line
// if the payload ends in a newline. For example, consider the following payload:
// "Hello\n"
// It needs to be written as:
// data: Hello\r\n
// data: \r\n
// \r\n
// Since we slice past the newline when we find it, after writing "Hello" in the previous example, we'll
// end up with an empty payload buffer, BUT we need to write it as an empty 'data:' line, so we need
// to use a condition that ensure the only time we stop writing is when we write the slice after the final
// newline.
var keepWriting = true;
while (keepWriting)
{
// Seek to the end of buffer or newline
var sliced = payload.TrySliceTo(LineFeed, out var slice, out var cursor);
var sliceEnd = payload.IndexOf(LineFeed);
var nextSliceStart = sliceEnd + 1;
if (sliceEnd < 0)
{
sliceEnd = payload.Length;
nextSliceStart = sliceEnd + 1;
if (!TryFormatLine(sliced ? slice : payload, buffer, out var writtenByLine))
// This is the last span
keepWriting = false;
}
if (sliceEnd > 0 && payload[sliceEnd - 1] == '\r')
{
sliceEnd--;
}
var slice = payload.Slice(0, sliceEnd);
if (nextSliceStart >= payload.Length)
{
payload = Span<byte>.Empty;
}
else
{
payload = payload.Slice(nextSliceStart);
}
if (!TryFormatLine(slice, buffer, out var writtenByLine))
{
bytesWritten = 0;
return false;
}
buffer = buffer.Slice(writtenByLine);
writtenSoFar += writtenByLine;
if (sliced)
{
payload = payload.Slice(payload.Move(cursor, 1));
}
else
{
break;
}
}
}
@ -126,13 +152,13 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
return true;
}
private static bool TryFormatLine(ReadableBuffer slice, Span<byte> buffer, out int bytesWritten)
private static bool TryFormatLine(ReadOnlySpan<byte> line, Span<byte> buffer, out int bytesWritten)
{
// We're going to write the whole thing. HOWEVER, if the last byte is a '\r', we want to truncate it
// because it was the '\r' in a '\r\n' newline sequence
// This won't require an additional byte in the buffer because after this line we have to write a newline sequence anyway.
var writtenSoFar = 0;
if (buffer.Length < DataPrefix.Length + slice.Length)
if (buffer.Length < DataPrefix.Length + line.Length)
{
bytesWritten = 0;
return false;
@ -141,8 +167,8 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
writtenSoFar += DataPrefix.Length;
buffer = buffer.Slice(DataPrefix.Length);
slice.CopyTo(buffer);
var sliceTo = slice.Length;
line.CopyTo(buffer);
var sliceTo = line.Length;
if (sliceTo > 0 && buffer[sliceTo - 1] == '\r')
{
sliceTo -= 1;

View File

@ -19,10 +19,10 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
internal static bool TryFormatMessage(Message message, Span<byte> buffer, out int bytesWritten)
{
// Calculate the length, it's the number of characters for text messages, but number of base64 characters for binary
var length = message.Payload.Buffer.Length;
var length = message.Payload.Length;
if (message.Type == MessageType.Binary)
{
length = (int)(4 * Math.Ceiling(((double)message.Payload.Buffer.Length / 3)));
length = (int)(4 * Math.Ceiling(((double)message.Payload.Length / 3)));
}
// Write the length as a string
@ -58,7 +58,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
{
// Encode the payload. For now, we make it an array and use the old-fashioned types because we need to mirror packages
// I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse
var payload = Convert.ToBase64String(message.Payload.Buffer.ToArray());
var payload = Convert.ToBase64String(message.Payload);
if (!TextEncoder.Utf8.TryEncode(payload, buffer, out int payloadWritten))
{
bytesWritten = 0;
@ -69,14 +69,14 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
}
else
{
if (buffer.Length < message.Payload.Buffer.Length)
if (buffer.Length < message.Payload.Length)
{
bytesWritten = 0;
return false;
}
message.Payload.Buffer.CopyTo(buffer.Slice(0, message.Payload.Buffer.Length));
written += message.Payload.Buffer.Length;
buffer = buffer.Slice(message.Payload.Buffer.Length);
message.Payload.CopyTo(buffer.Slice(0, message.Payload.Length));
written += message.Payload.Length;
buffer = buffer.Slice(message.Payload.Length);
}
// Terminator
@ -165,14 +165,13 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
// Parse the payload. For now, we make it an array and use the old-fashioned types.
// I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse
var payloadArray = payloadBuffer.ToArray();
PreservedBuffer payload;
var payload = payloadBuffer.ToArray();
if (messageType == MessageType.Binary)
{
byte[] decoded;
try
{
var str = Encoding.UTF8.GetString(payloadArray);
var str = Encoding.UTF8.GetString(payload);
decoded = Convert.FromBase64String(str);
}
catch
@ -182,11 +181,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
bytesConsumed = 0;
return false;
}
payload = ReadableBuffer.Create(decoded).Preserve();
}
else
{
payload = ReadableBuffer.Create(payloadArray).Preserve();
payload = decoded;
}
// Verify the trailer

View File

@ -2,32 +2,30 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Sockets
{
public struct Message : IDisposable
public struct Message
{
public bool EndOfMessage { get; }
public MessageType Type { get; }
public PreservedBuffer Payload { get; }
public Message(PreservedBuffer payload, MessageType type)
// REVIEW: We need a better primitive to use here. Memory<byte> would be good,
// but @davidfowl has concerns about allocating OwnedMemory and how to dispose
// it properly
public byte[] Payload { get; }
public Message(byte[] payload, MessageType type)
: this(payload, type, endOfMessage: true)
{
}
public Message(PreservedBuffer payload, MessageType type, bool endOfMessage)
public Message(byte[] payload, MessageType type, bool endOfMessage)
{
Type = type;
EndOfMessage = endOfMessage;
Payload = payload;
}
public void Dispose()
{
Payload.Dispose();
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -331,7 +331,7 @@ namespace Microsoft.AspNetCore.Sockets
: MessageType.Text;
var message = new Message(
ReadableBuffer.Create(buffer).Preserve(),
buffer,
format,
endOfMessage: true);

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -37,12 +37,9 @@ namespace Microsoft.AspNetCore.Sockets.Transports
Message message;
if (_application.TryRead(out message))
{
using (message)
{
_logger.LogDebug("Writing {0} byte message to response", message.Payload.Buffer.Length);
context.Response.ContentLength = message.Payload.Buffer.Length;
await message.Payload.Buffer.CopyToAsync(context.Response.Body);
}
_logger.LogDebug("Writing {0} byte message to response", message.Payload.Length);
context.Response.ContentLength = message.Payload.Length;
await context.Response.Body.WriteAsync(message.Payload, 0, message.Payload.Length);
}
}
catch (OperationCanceledException)

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -33,13 +33,9 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
while (await _application.WaitToReadAsync(token))
{
Message message;
while (_application.TryRead(out message))
while (_application.TryRead(out var message))
{
using (message)
{
await Send(context, message);
}
await Send(context, message);
}
}
}
@ -53,8 +49,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
// TODO: Pooled buffers
// 8 = 6(data: ) + 2 (\n\n)
_logger.LogDebug("Sending {0} byte message to Server-Sent Events client", message.Payload.Buffer.Length);
var buffer = new byte[8 + message.Payload.Buffer.Length];
_logger.LogDebug("Sending {0} byte message to Server-Sent Events client", message.Payload.Length);
var buffer = new byte[8 + message.Payload.Length];
var at = 0;
buffer[at++] = (byte)'d';
buffer[at++] = (byte)'a';
@ -62,8 +58,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports
buffer[at++] = (byte)'a';
buffer[at++] = (byte)':';
buffer[at++] = (byte)' ';
message.Payload.Buffer.CopyTo(new Span<byte>(buffer, at, message.Payload.Buffer.Length));
at += message.Payload.Buffer.Length;
message.Payload.CopyTo(new Span<byte>(buffer, at, message.Payload.Length));
at += message.Payload.Length;
buffer[at++] = (byte)'\n';
buffer[at++] = (byte)'\n';
await context.Response.Body.WriteAsync(buffer, 0, at);

View File

@ -3,6 +3,7 @@
using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
@ -137,7 +138,8 @@ namespace Microsoft.AspNetCore.Sockets.Transports
}
// Create a Message for the frame
var message = new Message(frame.Payload.Preserve(), effectiveOpcode == WebSocketOpcode.Binary ? MessageType.Binary : MessageType.Text, frame.EndOfMessage);
// This has to copy the buffer :(.
var message = new Message(frame.Payload.ToArray(), effectiveOpcode == WebSocketOpcode.Binary ? MessageType.Binary : MessageType.Text, frame.EndOfMessage);
// Write the message to the channel
return _application.Output.WriteAsync(message);
@ -160,31 +162,28 @@ namespace Microsoft.AspNetCore.Sockets.Transports
Message message;
while (_application.Input.TryRead(out message))
{
using (message)
if (message.Payload.Length > 0)
{
if (message.Payload.Buffer.Length > 0)
try
{
try
{
var opcode = message.Type == MessageType.Binary ?
WebSocketOpcode.Binary :
WebSocketOpcode.Text;
var opcode = message.Type == MessageType.Binary ?
WebSocketOpcode.Binary :
WebSocketOpcode.Text;
var frame = new WebSocketFrame(
endOfMessage: message.EndOfMessage,
opcode: _lastFrameIncomplete ? WebSocketOpcode.Continuation : opcode,
payload: message.Payload.Buffer);
var frame = new WebSocketFrame(
endOfMessage: message.EndOfMessage,
opcode: _lastFrameIncomplete ? WebSocketOpcode.Continuation : opcode,
payload: ReadableBuffer.Create(message.Payload));
_lastFrameIncomplete = !message.EndOfMessage;
_lastFrameIncomplete = !message.EndOfMessage;
LogFrame("Sending", frame);
await ws.SendAsync(frame);
}
catch (Exception ex)
{
_logger.LogError("Error writing frame to output: {0}", ex);
break;
}
LogFrame("Sending", frame);
await ws.SendAsync(frame);
}
catch (Exception ex)
{
_logger.LogError("Error writing frame to output: {0}", ex);
break;
}
}
}

View File

@ -63,8 +63,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
},
stream);
var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve();
await Application.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true));
await Application.Output.WriteAsync(new Message(stream.ToArray(), MessageType.Binary, endOfMessage: true));
}
public async Task<T> Read<T>() where T : InvocationMessage
@ -87,11 +86,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
Message message;
if (Application.Input.TryRead(out message))
{
using (message)
{
var value = await _adapter.ReadMessageAsync(new MemoryStream(message.Payload.Buffer.ToArray()), _binder);
return value as T;
}
var value = await _adapter.ReadMessageAsync(new MemoryStream(message.Payload), _binder);
return value as T;
}
return null;

View File

@ -154,7 +154,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
// The channel needs to be drained for the Completion task to be completed
while (transportToConnection.In.TryRead(out Message message))
{
message.Dispose();
}
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);

View File

@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests
[Fact]
public void WriteInvalidMessages()
{
var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false);
var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false);
var ex = Assert.Throws<InvalidOperationException>(() =>
MessageFormatter.TryFormatMessage(message, Span<byte>.Empty, MessageFormat.Binary, out var written));
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);

View File

@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests
[Fact]
public void WriteInvalidMessages()
{
var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false);
var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false);
var ex = Assert.Throws<InvalidOperationException>(() =>
ServerSentEventsMessageFormatter.TryFormatMessage(message, Span<byte>.Empty, out var written));
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);

View File

@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using Microsoft.AspNetCore.Sockets.Tests;
using Xunit;
@ -73,7 +72,7 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests
[Fact]
public void WriteInvalidMessages()
{
var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false);
var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false);
var ex = Assert.Throws<InvalidOperationException>(() =>
MessageFormatter.TryFormatMessage(message, Span<byte>.Empty, MessageFormat.Text, out var written));
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
@ -13,20 +13,20 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
Assert.True(message.EndOfMessage);
Assert.Equal(messageType, message.Type);
Assert.Equal(payload, message.Payload.Buffer.ToArray());
Assert.Equal(payload, message.Payload);
}
public static void AssertMessage(Message message, MessageType messageType, string payload)
{
Assert.True(message.EndOfMessage);
Assert.Equal(messageType, message.Type);
Assert.Equal(payload, Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
Assert.Equal(payload, Encoding.UTF8.GetString(message.Payload));
}
public static Message CreateMessage(byte[] payload, MessageType type = MessageType.Binary)
{
return new Message(
ReadableBuffer.Create(payload).Preserve(),
payload,
type,
endOfMessage: true);
}
@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public static Message CreateMessage(string payload, MessageType type)
{
return new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes(payload)).Preserve(),
Encoding.UTF8.GetBytes(payload),
type,
endOfMessage: true);
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -251,7 +251,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var task = dispatcher.ExecuteAsync<TestEndPoint>("", context);
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the transport so the poll yields
await state.Connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));
@ -276,7 +276,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var task = dispatcher.ExecuteAsync<BlockingEndPoint>("", context);
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));
@ -301,7 +301,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var task = dispatcher.ExecuteAsync<BlockingEndPoint>("", context);
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
@ -39,7 +39,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.Response.Body = ms;
await channel.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(),
Encoding.UTF8.GetBytes("Hello World"),
MessageType.Text,
endOfMessage: true));

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.Response.Body = ms;
await channel.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(),
Encoding.UTF8.GetBytes("Hello World"),
MessageType.Text,
endOfMessage: true));

View File

@ -47,12 +47,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
payload: ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello"))));
await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure);
using (var message = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
}
var message = await applicationSide.Input.In.ReadAsync();
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload));
Assert.True(applicationSide.Output.Out.TryComplete());
@ -99,19 +97,15 @@ namespace Microsoft.AspNetCore.Sockets.Tests
payload: ReadableBuffer.Create(Encoding.UTF8.GetBytes("World"))));
await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure);
using (var message1 = await applicationSide.Input.In.ReadAsync())
{
Assert.False(message1.EndOfMessage);
Assert.Equal(format, message1.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message1.Payload.Buffer.ToArray()));
}
var message1 = await applicationSide.Input.In.ReadAsync();
Assert.False(message1.EndOfMessage);
Assert.Equal(format, message1.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message1.Payload));
using (var message2 = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message2.EndOfMessage);
Assert.Equal(format, message2.Type);
Assert.Equal("World", Encoding.UTF8.GetString(message2.Payload.Buffer.ToArray()));
}
var message2 = await applicationSide.Input.In.ReadAsync();
Assert.True(message2.EndOfMessage);
Assert.Equal(format, message2.Type);
Assert.Equal("World", Encoding.UTF8.GetString(message2.Payload));
Assert.True(applicationSide.Output.Out.TryComplete());
@ -149,11 +143,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
// Write multi-frame message to the output channel, and then complete it
await applicationSide.Output.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello")).Preserve(),
Encoding.UTF8.GetBytes("Hello"),
format,
endOfMessage: false));
await applicationSide.Output.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("World")).Preserve(),
Encoding.UTF8.GetBytes("World"),
format,
endOfMessage: true));
Assert.True(applicationSide.Output.Out.TryComplete());
@ -197,7 +191,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
// Write to the output channel, and then complete it
await applicationSide.Output.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello")).Preserve(),
Encoding.UTF8.GetBytes("Hello"),
format,
endOfMessage: true));
Assert.True(applicationSide.Output.Out.TryComplete());
@ -248,12 +242,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await pair.ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure);
// Read that frame from the input
using (var message = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
}
var message = await applicationSide.Input.In.ReadAsync();
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload));
await transport;
}