add support for binary format in /poll (#303)

This commit is contained in:
Andrew Stanton-Nurse 2017-03-16 11:06:26 -07:00 committed by GitHub
parent 4a8a28ec16
commit 1732ac5760
10 changed files with 246 additions and 41 deletions

View File

@ -53,8 +53,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
_application = application;
// Start sending and polling
_poller = Poll(Utils.AppendPath(url, "poll"), _transportCts.Token);
// Start sending and polling (ask for binary if the server supports it)
var pollUrl = Utils.AppendQueryString(Utils.AppendPath(url, "poll"), "supportsBinary=true");
_poller = Poll(pollUrl, _transportCts.Token);
_sender = SendMessages(Utils.AppendPath(url, "send"), _transportCts.Token);
Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
@ -110,11 +111,13 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.LogDebug("Received messages from the server");
var messageFormat = MessageParser.GetFormatFromContentType(response.Content.Headers.ContentType.ToString());
// Until Pipeline starts natively supporting BytesReader, this is the easiest way to do this.
var payload = await response.Content.ReadAsByteArrayAsync();
if (payload.Length > 0)
{
var messages = ParsePayload(payload);
var messages = ParsePayload(payload, messageFormat);
foreach (var message in messages)
{
@ -148,10 +151,13 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger.LogInformation("Receive loop stopped");
}
private IList<Message> ParsePayload(byte[] payload)
private IList<Message> ParsePayload(byte[] payload, MessageFormat messageFormat)
{
var reader = new BytesReader(payload);
var messageFormat = MessageParser.GetFormat(reader.Unread[0]);
if (messageFormat != MessageParser.GetFormatFromIndicator(reader.Unread[0]))
{
throw new FormatException($"Format indicator '{(char)reader.Unread[0]}' does not match format determined by Content-Type '{MessageFormatter.GetContentType(messageFormat)}'");
}
reader.Advance(1);
_parser.Reset();

View File

@ -26,11 +26,19 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
var builder = new UriBuilder(url);
var newQueryString = builder.Query;
if (!string.IsNullOrEmpty(builder.Query))
{
builder.Query += "&";
newQueryString += "&";
}
builder.Query += qs;
newQueryString += qs;
if (newQueryString.Length > 0 && newQueryString[0] == '?')
{
newQueryString = newQueryString.Substring(1);
}
builder.Query = newQueryString;
return builder.Uri;
}
}

View File

@ -24,7 +24,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
_binaryParser.TryParseMessage(ref buffer, out message);
}
public static MessageFormat GetFormat(byte formatIndicator)
public static MessageFormat GetFormatFromIndicator(byte formatIndicator)
{
// Can't use switch because our "constants" are not consts, they're "static readonly" (which is good, because they are public)
if (formatIndicator == MessageFormatter.TextFormatIndicator)
@ -39,5 +39,21 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
throw new ArgumentException($"Invalid message format: 0x{formatIndicator:X}", nameof(formatIndicator));
}
public static MessageFormat GetFormatFromContentType(string contentType)
{
// Can't use switch because our "constants" are not consts, they're "static readonly" (which is good, because they are public)
if (string.Equals(contentType, MessageFormatter.TextContentType, StringComparison.OrdinalIgnoreCase))
{
return MessageFormat.Text;
}
if (string.Equals(contentType, MessageFormatter.BinaryContentType, StringComparison.OrdinalIgnoreCase))
{
return MessageFormat.Binary;
}
throw new ArgumentException($"Invalid Content-Type: '{contentType}'", nameof(contentType));
}
}
}

View File

@ -38,8 +38,10 @@ namespace Microsoft.AspNetCore.Sockets.Transports
return;
}
// TODO: Add support for binary protocol
var messageFormat = MessageFormat.Text;
// REVIEW: We could also use the 'Accept' header, in theory...
var messageFormat = string.Equals(context.Request.Query["supportsBinary"], "true", StringComparison.OrdinalIgnoreCase) ?
MessageFormat.Binary :
MessageFormat.Text;
context.Response.ContentType = MessageFormatter.GetContentType(messageFormat);
var writer = context.Response.Body.AsPipelineWriter();

View File

@ -67,19 +67,22 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
get
{
yield return new object[] { new WebSocketsTransport() };
yield return new object[] { new LongPollingTransport(new HttpClient()) };
yield return new object[] { new Func<ILoggerFactory, ITransport>(loggerFactory => new WebSocketsTransport(loggerFactory)) };
yield return new object[] { new Func<ILoggerFactory, ITransport>(loggerFactory => new LongPollingTransport(new HttpClient(), loggerFactory)) };
}
}
[ConditionalTheory]
[OSSkipCondition(OperatingSystems.Windows, WindowsVersions.Win7, WindowsVersions.Win2008R2, SkipReason = "No WebSockets Client for this platform")]
[MemberData(nameof(Transports))]
public async Task ConnectionCanSendAndReceiveMessages(ITransport transport)
public async Task ConnectionCanSendAndReceiveMessages(Func<ILoggerFactory, ITransport> transportFactory)
{
const string message = "Major Key";
var baseUrl = _serverFixture.BaseUrl;
var loggerFactory = new LoggerFactory();
loggerFactory.AddXUnit(_output, LogLevel.Trace);
var transport = transportFactory(loggerFactory);
using (var httpClient = new HttpClient())
{
@ -119,8 +122,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
get
{
yield return new object[] { new string('A', 5 * 1024)};
yield return new object[] { new string('A', 5 * 1024 * 1024 + 32)};
yield return new object[] { new string('A', 5 * 1024) };
yield return new object[] { new string('A', 5 * 1024 * 1024 + 32) };
}
}

View File

@ -1,12 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.AspNetCore.Sockets.Tests.Internal;
using Microsoft.Extensions.Logging;
using Moq;
using Moq.Protected;
using System;
using System.Linq;
using System.Net;
@ -15,6 +9,13 @@ using System.Text;
using System.Text.Formatting;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.AspNetCore.Sockets.Tests.Internal;
using Microsoft.Extensions.Logging;
using Moq;
using Moq.Protected;
using Xunit;
namespace Microsoft.AspNetCore.Sockets.Client.Tests
@ -265,7 +266,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -438,7 +439,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -479,7 +480,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
sendTcs.SetResult(await request.Content.ReadAsByteArrayAsync());
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -555,9 +556,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Task.Yield();
if (request.RequestUri.AbsolutePath.EndsWith("/send"))
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError);
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -589,7 +590,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
content = "T2:T:42;";
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(content) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK, MessageFormatter.TextContentType, content);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))

View File

@ -6,6 +6,7 @@ using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging;
@ -41,7 +42,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -73,7 +74,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -120,7 +121,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -152,7 +153,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
@ -10,13 +11,16 @@ using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
using Moq;
using Moq.Protected;
using Xunit;
namespace Microsoft.AspNetCore.Sockets.Client.Tests
namespace Microsoft.AspNetCore.Client.Tests
{
public class LongPollingTransportTests
{
@ -29,7 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
Task transportActiveTask;
@ -67,7 +71,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.NoContent) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.NoContent);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -100,7 +104,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -136,7 +140,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
var statusCode = request.RequestUri.AbsolutePath.EndsWith("send")
? HttpStatusCode.InternalServerError
: HttpStatusCode.OK;
return new HttpResponseMessage(statusCode) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(statusCode);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -177,7 +181,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
@ -204,6 +208,127 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
}
[Fact]
public async Task LongPollingTransportThrowsIfFormatIndicatorDoesNotMatchContentType()
{
var encoded = new byte[] { (byte)'T' };
var firstCall = true;
var mockHttpHandler = new Mock<HttpMessageHandler>();
var sentRequests = new List<HttpRequestMessage>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
sentRequests.Add(request);
await Task.Yield();
if (firstCall)
{
firstCall = false;
return ResponseUtils.CreateResponse(HttpStatusCode.OK, MessageFormatter.BinaryContentType, encoded);
}
return ResponseUtils.CreateResponse(HttpStatusCode.NoContent);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<SendMessage, Message>(connectionToTransport, transportToConnection);
// Start the transport
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
// Transport should fail
var ex = await Assert.ThrowsAsync<FormatException>(() => longPollingTransport.Running.OrTimeout());
Assert.Equal($"Format indicator 'T' does not match format determined by Content-Type '{MessageFormatter.BinaryContentType}'", ex.Message);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
[Fact]
public async Task LongPollingTransportDispatchesMessagesReceivedFromPoll()
{
var message1Payload = new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o' };
var message2Payload = new byte[] { (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d' };
var encoded = Enumerable.SelectMany(new[] {
new byte[] { (byte)'B', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00 }, message1Payload,
new byte[] { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x01 }, message2Payload
}, b => b).ToArray();
var firstCall = true;
var mockHttpHandler = new Mock<HttpMessageHandler>();
var sentRequests = new List<HttpRequestMessage>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
sentRequests.Add(request);
await Task.Yield();
if (firstCall)
{
firstCall = false;
return ResponseUtils.CreateResponse(HttpStatusCode.OK, MessageFormatter.BinaryContentType, encoded);
}
return ResponseUtils.CreateResponse(HttpStatusCode.NoContent);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<SendMessage, Message>(connectionToTransport, transportToConnection);
// Start the transport
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
// Wait for the transport to finish
await longPollingTransport.Running.OrTimeout();
// Pull Messages out of the channel
var messages = new List<Message>();
while (await transportToConnection.In.WaitToReadAsync())
{
while (transportToConnection.In.TryRead(out var message))
{
messages.Add(message);
}
}
// Check the provided request
Assert.Equal(2, sentRequests.Count);
Assert.Equal("?supportsBinary=true", sentRequests[0].RequestUri.Query);
// Check the messages received
Assert.Equal(2, messages.Count);
Assert.Equal(MessageType.Text, messages[0].Type);
Assert.Equal(message1Payload, messages[0].Payload);
Assert.Equal(MessageType.Binary, messages[1].Type);
Assert.Equal(message2Payload, messages[1].Payload);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
[Fact]
public async Task LongPollingTransportSendsAvailableMessagesWhenTheyArrive()
{
@ -220,7 +345,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
// Build a new request object, but convert the entire payload to string
sentRequests.Add(await request.Content.ReadAsByteArrayAsync());
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))

View File

@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
namespace Microsoft.AspNetCore.Client.Tests
{
internal static class ResponseUtils
{
public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode) =>
CreateResponse(statusCode, MessageFormatter.TextContentType, string.Empty);
public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string contentType, string payload) =>
CreateResponse(statusCode, contentType, new StringContent(payload));
public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string contentType, byte[] payload) =>
CreateResponse(statusCode, contentType, new ByteArrayContent(payload));
public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string contentType, HttpContent payload)
{
payload.Headers.ContentType = MediaTypeHeaderValue.Parse(contentType);
return new HttpResponseMessage(statusCode)
{
Content = payload
};
}
}
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Text;
@ -51,11 +52,17 @@ namespace Microsoft.AspNetCore.Sockets.Tests
Assert.Equal("T11:T:Hello World;", Encoding.UTF8.GetString(ms.ToArray()));
}
[Fact]
public async Task MultipleFramesSentAsSingleResponse()
[Theory]
[InlineData(MessageFormat.Text, "T5:T:Hello;1:T: ;5:T:World;")]
[InlineData(MessageFormat.Binary, "QgAAAAAAAAAFAEhlbGxvAAAAAAAAAAEAIAAAAAAAAAAFAFdvcmxk")]
public async Task MultipleFramesSentAsSingleResponse(MessageFormat format, string expectedPayload)
{
var channel = Channel.CreateUnbounded<Message>();
var context = new DefaultHttpContext();
if (format == MessageFormat.Binary)
{
context.Request.QueryString = QueryString.Create("supportsBinary", "true");
}
var poll = new LongPollingTransport(channel, new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;
@ -78,7 +85,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await poll.ProcessRequestAsync(context, context.RequestAborted);
Assert.Equal(200, context.Response.StatusCode);
Assert.Equal("T5:T:Hello;1:T: ;5:T:World;", Encoding.UTF8.GetString(ms.ToArray()));
var payload = ms.ToArray();
var encoded = format == MessageFormat.Binary ?
Convert.ToBase64String(payload) :
Encoding.UTF8.GetString(payload);
Assert.Equal(expectedPayload, encoded);
}
}
}