From 489bd80b881c383dbb6f2a97de077f4ec0cd8932 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 12 Mar 2018 12:50:42 -0700 Subject: [PATCH] Remove threshold from client SSE pipe (#1577) --- .../Internal/PipelineReaderExtensions.cs | 34 -------- .../Internal/StreamExtensions.cs | 78 ++----------------- .../Internal/StreamPipeConnection.cs | 33 +------- .../ServerSentEventsMessageParser.cs | 2 +- .../ServerSentEventsTransport.cs | 4 +- 5 files changed, 13 insertions(+), 138 deletions(-) delete mode 100644 src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs deleted file mode 100644 index c6392b4afa..0000000000 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Threading; -using System.Threading.Tasks; - -namespace System.IO.Pipelines -{ - internal static class PipelineReaderExtensions - { - public static async Task CopyToAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) - { - // TODO: Use bufferSize argument - while (!cancellationToken.IsCancellationRequested) - { - var result = await input.ReadAsync(); - var inputBuffer = result.Buffer; - try - { - if (inputBuffer.IsEmpty && result.IsCompleted) - { - return; - } - - await inputBuffer.CopyToAsync(stream); - } - finally - { - input.AdvanceTo(inputBuffer.End); - } - } - } - } -} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs index 3da3700313..8adf707c49 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Buffers; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -10,19 +8,6 @@ namespace System.IO.Pipelines { internal static class StreamExtensions { - /// - /// Copies the content of a into a . - /// - /// - /// - /// - /// - public static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default) - { - // 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken - return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken); - } - public static async Task CopyToEndAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default) { try @@ -38,62 +23,16 @@ namespace System.IO.Pipelines } /// - /// Copies a to a asynchronously + /// Copies the content of a into a . /// - /// The to copy - /// The target + /// + /// + /// /// - public static Task CopyToAsync(this ReadOnlySequence buffer, Stream stream) + private static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default) { - if (buffer.IsSingleSegment) - { - return WriteToStream(stream, buffer.First); - } - - return CopyMultipleToStreamAsync(buffer, stream); - } - - private static async Task CopyMultipleToStreamAsync(this ReadOnlySequence buffer, Stream stream) - { - foreach (var memory in buffer) - { - await WriteToStream(stream, memory); - } - } - - private static async Task WriteToStream(Stream stream, ReadOnlyMemory readOnlyMemory) - { - var memory = MemoryMarshal.AsMemory(readOnlyMemory); - if (MemoryMarshal.TryGetArray(memory, out ArraySegment data)) - { - await stream.WriteAsync(data.Array, data.Offset, data.Count) - .ConfigureAwait(continueOnCapturedContext: false); - } - else - { - // Copy required - var array = memory.Span.ToArray(); - await stream.WriteAsync(array, 0, array.Length).ConfigureAwait(continueOnCapturedContext: false); - } - } - - public static Task CopyToEndAsync(this PipeReader input, Stream stream) - { - return input.CopyToEndAsync(stream, 4096, CancellationToken.None); - } - - public static async Task CopyToEndAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) - { - try - { - await input.CopyToAsync(stream, bufferSize, cancellationToken); - } - catch (Exception ex) - { - input.Complete(ex); - return; - } - return; + // 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken + return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken); } private class PipelineWriterStream : Stream @@ -148,8 +87,7 @@ namespace System.IO.Pipelines { cancellationToken.ThrowIfCancellationRequested(); - _writer.Write(new ReadOnlySpan(buffer, offset, count)); - await _writer.FlushAsync(cancellationToken); + await _writer.WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken); } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs index b600a85a4a..69534a2e8a 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs @@ -3,24 +3,8 @@ namespace System.IO.Pipelines { - internal class StreamPipeConnection : IDuplexPipe + internal class StreamPipeConnection { - public StreamPipeConnection(PipeOptions options, Stream stream) - { - Input = CreateReader(options, stream); - Output = CreateWriter(options, stream); - } - - public PipeReader Input { get; } - - public PipeWriter Output { get; } - - public void Dispose() - { - Input.Complete(); - Output.Complete(); - } - public static PipeReader CreateReader(PipeOptions options, Stream stream) { if (!stream.CanRead) @@ -29,22 +13,9 @@ namespace System.IO.Pipelines } var pipe = new Pipe(options); - var ignore = stream.CopyToEndAsync(pipe.Writer); + _ = stream.CopyToEndAsync(pipe.Writer); return pipe.Reader; } - - public static PipeWriter CreateWriter(PipeOptions options, Stream stream) - { - if (!stream.CanWrite) - { - throw new NotSupportedException(); - } - - var pipe = new Pipe(options); - var ignore = pipe.Reader.CopyToEndAsync(stream); - - return pipe.Writer; - } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs index 4346696f48..2a150d2ff3 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs @@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters { if (!(buffer.PositionOf(ByteLF) is SequencePosition lineEnd)) { - // For the case of data: Foo\r\n\r\ + // For the case of data: Foo\r\n\r\ if (_internalParserState == InternalParseState.ReadEndOfMessage) { if (ConvertBufferToSpan(buffer.Slice(start, buffer.End)).Length > 1) diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs index 83836e2dc5..e2646041a2 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs @@ -82,7 +82,8 @@ namespace Microsoft.AspNetCore.Sockets.Client using (var stream = await response.Content.ReadAsStreamAsync()) { - var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream); + var pipeOptions = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0); + var pipelineReader = StreamPipeConnection.CreateReader(pipeOptions, stream); var readCancellationRegistration = cancellationToken.Register( reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader); try @@ -99,7 +100,6 @@ namespace Microsoft.AspNetCore.Sockets.Client var consumed = input.Start; var examined = input.End; - try { Log.ParsingSSE(_logger, input.Length);