diff --git a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs index c7e050941f..872c4ca820 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs @@ -4,14 +4,9 @@ using System; using System.Buffers; using System.Collections.Generic; -using System.IO; -using System.IO.Pipelines; -using System.IO.Pipelines.Text.Primitives; using System.Net; using System.Net.Http; using System.Net.Http.Headers; -using System.Text; -using System.Text.Formatting; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Sockets.Internal.Formatters; @@ -56,7 +51,7 @@ namespace Microsoft.AspNetCore.Sockets.Client // Start sending and polling (ask for binary if the server supports it) var pollUrl = Utils.AppendQueryString(Utils.AppendPath(url, "poll"), "supportsBinary=true"); _poller = Poll(pollUrl, _transportCts.Token); - _sender = SendMessages(Utils.AppendPath(url, "send"), _transportCts.Token); + _sender = SendUtils.SendMessages(Utils.AppendPath(url, "send"), _application, _httpClient, _transportCts, _logger); Running = Task.WhenAll(_sender, _poller).ContinueWith(t => { @@ -177,111 +172,5 @@ namespace Microsoft.AspNetCore.Sockets.Client } return messages; } - - private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken) - { - _logger.LogInformation("Starting the send loop"); - IList messages = null; - try - { - while (await _application.Input.WaitToReadAsync(cancellationToken)) - { - // Grab as many messages as we can from the channel - messages = new List(); - while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out SendMessage message)) - { - messages.Add(message); - } - - if (messages.Count > 0) - { - _logger.LogDebug("Sending {0} message(s) to the server using url: {1}", messages.Count, sendUrl); - - // Send them in a single post - var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); - request.Headers.UserAgent.Add(DefaultUserAgentHeader); - - // TODO: We can probably use a pipeline here or some kind of pooled memory. - // But where do we get the pool from? ArrayBufferPool.Instance? - var memoryStream = new MemoryStream(); - - // Write the messages to the stream - var pipe = memoryStream.AsPipelineWriter(); - var output = new PipelineTextOutput(pipe, TextEncoder.Utf8); // We don't need the Encoder, but it's harmless to set. - await WriteMessagesAsync(messages, output, MessageFormat.Binary); - - // Seek back to the start - memoryStream.Seek(0, SeekOrigin.Begin); - - // Set the, now filled, stream as the content - request.Content = new StreamContent(memoryStream); - request.Content.Headers.ContentType = MediaTypeHeaderValue.Parse(MessageFormatter.GetContentType(MessageFormat.Binary)); - - var response = await _httpClient.SendAsync(request); - response.EnsureSuccessStatusCode(); - - _logger.LogDebug("Message(s) sent successfully"); - foreach (var message in messages) - { - message.SendResult?.TrySetResult(null); - } - } - else - { - _logger.LogDebug("No messages in batch to send"); - } - } - } - catch (OperationCanceledException) - { - // transport is being closed - if (messages != null) - { - foreach (var message in messages) - { - // This will no-op for any messages that were already marked as completed. - message.SendResult?.TrySetCanceled(); - } - } - } - catch (Exception ex) - { - _logger.LogError("Error while sending to '{0}': {1}", sendUrl, ex); - if (messages != null) - { - foreach (var message in messages) - { - // This will no-op for any messages that were already marked as completed. - message.SendResult?.TrySetException(ex); - } - } - throw; - } - finally - { - // Make sure the poll loop is terminated - _transportCts.Cancel(); - } - - _logger.LogInformation("Send loop stopped"); - } - - private async Task WriteMessagesAsync(IList messages, PipelineTextOutput output, MessageFormat format) - { - output.Append(MessageFormatter.GetFormatIndicator(format), TextEncoder.Utf8); - - foreach (var message in messages) - { - _logger.LogDebug("Writing '{0}' message to the server", message.Type); - - var payload = message.Payload ?? Array.Empty(); - if (!MessageFormatter.TryWriteMessage(new Message(payload, message.Type, endOfMessage: true), output, format)) - { - // We didn't get any more memory! - throw new InvalidOperationException("Unable to write message to pipeline"); - } - await output.FlushAsync(); - } - } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client/SendUtils.cs new file mode 100644 index 0000000000..21a0f2773a --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets.Client/SendUtils.cs @@ -0,0 +1,131 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Pipelines; +using System.IO.Pipelines.Text.Primitives; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Text.Formatting; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets.Internal.Formatters; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AspNetCore.Sockets.Client +{ + internal static class SendUtils + { + private static readonly string DefaultUserAgent = "Microsoft.AspNetCore.SignalR.Client/0.0.0"; + private static readonly ProductInfoHeaderValue DefaultUserAgentHeader = ProductInfoHeaderValue.Parse(DefaultUserAgent); + + public static async Task SendMessages(Uri sendUrl, IChannelConnection application, HttpClient httpClient, CancellationTokenSource transportCts, ILogger logger) + { + logger.LogInformation("Starting the send loop"); + IList messages = null; + try + { + while (await application.Input.WaitToReadAsync(transportCts.Token)) + { + // Grab as many messages as we can from the channel + messages = new List(); + while (!transportCts.Token.IsCancellationRequested && application.Input.TryRead(out SendMessage message)) + { + messages.Add(message); + } + + if (messages.Count > 0) + { + logger.LogDebug("Sending {0} message(s) to the server using url: {1}", messages.Count, sendUrl); + + // Send them in a single post + var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); + request.Headers.UserAgent.Add(DefaultUserAgentHeader); + + // TODO: We can probably use a pipeline here or some kind of pooled memory. + // But where do we get the pool from? ArrayBufferPool.Instance? + var memoryStream = new MemoryStream(); + + // Write the messages to the stream + var pipe = memoryStream.AsPipelineWriter(); + var output = new PipelineTextOutput(pipe, TextEncoder.Utf8); // We don't need the Encoder, but it's harmless to set. + await WriteMessagesAsync(messages, output, MessageFormat.Binary, logger); + + // Seek back to the start + memoryStream.Seek(0, SeekOrigin.Begin); + + // Set the, now filled, stream as the content + request.Content = new StreamContent(memoryStream); + request.Content.Headers.ContentType = MediaTypeHeaderValue.Parse(MessageFormatter.GetContentType(MessageFormat.Binary)); + + var response = await httpClient.SendAsync(request); + response.EnsureSuccessStatusCode(); + + logger.LogDebug("Message(s) sent successfully"); + foreach (var message in messages) + { + message.SendResult?.TrySetResult(null); + } + } + else + { + logger.LogDebug("No messages in batch to send"); + } + } + } + catch (OperationCanceledException) + { + // transport is being closed + if (messages != null) + { + foreach (var message in messages) + { + // This will no-op for any messages that were already marked as completed. + message.SendResult?.TrySetCanceled(); + } + } + } + catch (Exception ex) + { + logger.LogError("Error while sending to '{0}': {1}", sendUrl, ex); + if (messages != null) + { + foreach (var message in messages) + { + // This will no-op for any messages that were already marked as completed. + message.SendResult?.TrySetException(ex); + } + } + throw; + } + finally + { + // Make sure the poll loop is terminated + transportCts.Cancel(); + } + + logger.LogInformation("Send loop stopped"); + } + + private static async Task WriteMessagesAsync(IList messages, PipelineTextOutput output, MessageFormat format, ILogger logger) + { + output.Append(MessageFormatter.GetFormatIndicator(format), TextEncoder.Utf8); + + foreach (var message in messages) + { + logger.LogDebug("Writing '{0}' message to the server", message.Type); + + var payload = message.Payload ?? Array.Empty(); + if (!MessageFormatter.TryWriteMessage(new Message(payload, message.Type, endOfMessage: true), output, format)) + { + // We didn't get any more memory! + throw new InvalidOperationException("Unable to write message to pipeline"); + } + await output.FlushAsync(); + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Sockets.Client/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/ServerSentEventsTransport.cs index 42e17344a3..33d309e583 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/ServerSentEventsTransport.cs @@ -2,20 +2,15 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; -using System.IO; using System.IO.Pipelines; -using System.IO.Pipelines.Text.Primitives; using System.Net.Http; using System.Net.Http.Headers; -using System.Text; -using System.Text.Formatting; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Sockets.Internal.Formatters; +using Microsoft.Extensions.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.Extensions.Internal; namespace Microsoft.AspNetCore.Sockets.Client { @@ -55,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Client _application = application; var sseUrl = Utils.AppendPath(url, "sse"); var sendUrl = Utils.AppendPath(url, "send"); - var sendTask = SendMessages(sendUrl, _transportCts.Token); + var sendTask = SendUtils.SendMessages(sendUrl, _application, _httpClient, _transportCts, _logger); var receiveTask = OpenConnection(_application, sseUrl, _transportCts.Token); Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => @@ -128,101 +123,6 @@ namespace Microsoft.AspNetCore.Sockets.Client } } - private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken) - { - _logger.LogInformation("Starting the send loop"); - - List messages = null; - try - { - while (await _application.Input.WaitToReadAsync(cancellationToken)) - { - messages = new List(); - while (!cancellationToken.IsCancellationRequested && _application.Input.TryRead(out SendMessage message)) - { - messages.Add(message); - } - - if (messages.Count > 0) - { - _logger.LogDebug("Sending {messageCount} message(s) to the server using url: {url}", messages.Count, sendUrl); - - var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); - request.Headers.UserAgent.Add(DefaultUserAgentHeader); - - var memoryStream = new MemoryStream(); - - var pipe = memoryStream.AsPipelineWriter(); - var output = new PipelineTextOutput(pipe, TextEncoder.Utf8); - await WriteMessagesAsync(messages, output, MessageFormat.Binary); - - memoryStream.Seek(0, SeekOrigin.Begin); - - request.Content = new StreamContent(memoryStream); - request.Content.Headers.ContentType = MediaTypeHeaderValue.Parse(MessageFormatter.GetContentType(MessageFormat.Binary)); - - var response = await _httpClient.SendAsync(request); - response.EnsureSuccessStatusCode(); - - _logger.LogDebug("Message(s) sent successfully"); - foreach (var message in messages) - { - message.SendResult?.TrySetResult(null); - } - } - } - } - catch (OperationCanceledException) - { - _logger.LogError("Send cancelled"); - - if (messages != null) - { - foreach (var message in messages) - { - message.SendResult?.TrySetCanceled(); - } - } - } - catch (Exception ex) - { - _logger.LogDebug("Error while sending to '{url}' : '{exception}'", sendUrl, ex); - if (messages != null) - { - foreach (var message in messages) - { - message.SendResult?.TrySetException(ex); - } - } - throw; - } - finally - { - // Make sure the poll loop is terminated - _transportCts.Cancel(); - } - - _logger.LogInformation("Send loop stopped"); - } - - private async Task WriteMessagesAsync(List messages, PipelineTextOutput output, MessageFormat format) - { - output.Append(MessageFormatter.GetFormatIndicator(format), TextEncoder.Utf8); - - foreach (var message in messages) - { - _logger.LogDebug("Writing '{messageType}' message to the server", message.Type); - - var payload = message.Payload ?? Array.Empty(); - if (!MessageFormatter.TryWriteMessage(new Message(payload, message.Type, endOfMessage: true), output, format)) - { - // We didn't get any more memory! - throw new InvalidOperationException("Unable to write message to pipeline"); - } - await output.FlushAsync(); - } - } - public async Task StopAsync() { _logger.LogInformation("Transport {transportName} is stopping", nameof(ServerSentEventsTransport));