diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs
deleted file mode 100644
index c6392b4afa..0000000000
--- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace System.IO.Pipelines
-{
- internal static class PipelineReaderExtensions
- {
- public static async Task CopyToAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
- {
- // TODO: Use bufferSize argument
- while (!cancellationToken.IsCancellationRequested)
- {
- var result = await input.ReadAsync();
- var inputBuffer = result.Buffer;
- try
- {
- if (inputBuffer.IsEmpty && result.IsCompleted)
- {
- return;
- }
-
- await inputBuffer.CopyToAsync(stream);
- }
- finally
- {
- input.AdvanceTo(inputBuffer.End);
- }
- }
- }
- }
-}
diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs
index 3da3700313..8adf707c49 100644
--- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs
+++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-using System.Buffers;
-using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -10,19 +8,6 @@ namespace System.IO.Pipelines
{
internal static class StreamExtensions
{
- ///
- /// Copies the content of a into a .
- ///
- ///
- ///
- ///
- ///
- public static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
- {
- // 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken
- return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken);
- }
-
public static async Task CopyToEndAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
try
@@ -38,62 +23,16 @@ namespace System.IO.Pipelines
}
///
- /// Copies a to a asynchronously
+ /// Copies the content of a into a .
///
- /// The to copy
- /// The target
+ ///
+ ///
+ ///
///
- public static Task CopyToAsync(this ReadOnlySequence buffer, Stream stream)
+ private static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
- if (buffer.IsSingleSegment)
- {
- return WriteToStream(stream, buffer.First);
- }
-
- return CopyMultipleToStreamAsync(buffer, stream);
- }
-
- private static async Task CopyMultipleToStreamAsync(this ReadOnlySequence buffer, Stream stream)
- {
- foreach (var memory in buffer)
- {
- await WriteToStream(stream, memory);
- }
- }
-
- private static async Task WriteToStream(Stream stream, ReadOnlyMemory readOnlyMemory)
- {
- var memory = MemoryMarshal.AsMemory(readOnlyMemory);
- if (MemoryMarshal.TryGetArray(memory, out ArraySegment data))
- {
- await stream.WriteAsync(data.Array, data.Offset, data.Count)
- .ConfigureAwait(continueOnCapturedContext: false);
- }
- else
- {
- // Copy required
- var array = memory.Span.ToArray();
- await stream.WriteAsync(array, 0, array.Length).ConfigureAwait(continueOnCapturedContext: false);
- }
- }
-
- public static Task CopyToEndAsync(this PipeReader input, Stream stream)
- {
- return input.CopyToEndAsync(stream, 4096, CancellationToken.None);
- }
-
- public static async Task CopyToEndAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
- {
- try
- {
- await input.CopyToAsync(stream, bufferSize, cancellationToken);
- }
- catch (Exception ex)
- {
- input.Complete(ex);
- return;
- }
- return;
+ // 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken
+ return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken);
}
private class PipelineWriterStream : Stream
@@ -148,8 +87,7 @@ namespace System.IO.Pipelines
{
cancellationToken.ThrowIfCancellationRequested();
- _writer.Write(new ReadOnlySpan(buffer, offset, count));
- await _writer.FlushAsync(cancellationToken);
+ await _writer.WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken);
}
}
}
diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs
index b600a85a4a..69534a2e8a 100644
--- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs
+++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs
@@ -3,24 +3,8 @@
namespace System.IO.Pipelines
{
- internal class StreamPipeConnection : IDuplexPipe
+ internal class StreamPipeConnection
{
- public StreamPipeConnection(PipeOptions options, Stream stream)
- {
- Input = CreateReader(options, stream);
- Output = CreateWriter(options, stream);
- }
-
- public PipeReader Input { get; }
-
- public PipeWriter Output { get; }
-
- public void Dispose()
- {
- Input.Complete();
- Output.Complete();
- }
-
public static PipeReader CreateReader(PipeOptions options, Stream stream)
{
if (!stream.CanRead)
@@ -29,22 +13,9 @@ namespace System.IO.Pipelines
}
var pipe = new Pipe(options);
- var ignore = stream.CopyToEndAsync(pipe.Writer);
+ _ = stream.CopyToEndAsync(pipe.Writer);
return pipe.Reader;
}
-
- public static PipeWriter CreateWriter(PipeOptions options, Stream stream)
- {
- if (!stream.CanWrite)
- {
- throw new NotSupportedException();
- }
-
- var pipe = new Pipe(options);
- var ignore = pipe.Reader.CopyToEndAsync(stream);
-
- return pipe.Writer;
- }
}
}
diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
index 4346696f48..2a150d2ff3 100644
--- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
+++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
@@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
{
if (!(buffer.PositionOf(ByteLF) is SequencePosition lineEnd))
{
- // For the case of data: Foo\r\n\r\
+ // For the case of data: Foo\r\n\r\
if (_internalParserState == InternalParseState.ReadEndOfMessage)
{
if (ConvertBufferToSpan(buffer.Slice(start, buffer.End)).Length > 1)
diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
index 83836e2dc5..e2646041a2 100644
--- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
+++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
@@ -82,7 +82,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
using (var stream = await response.Content.ReadAsStreamAsync())
{
- var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream);
+ var pipeOptions = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0);
+ var pipelineReader = StreamPipeConnection.CreateReader(pipeOptions, stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
@@ -99,7 +100,6 @@ namespace Microsoft.AspNetCore.Sockets.Client
var consumed = input.Start;
var examined = input.End;
-
try
{
Log.ParsingSSE(_logger, input.Length);