diff --git a/build/dependencies.props b/build/dependencies.props index 68bebcfcbf..4c9180b594 100644 --- a/build/dependencies.props +++ b/build/dependencies.props @@ -60,7 +60,6 @@ 10.0.1 1.2.4 4.5.0-preview1-26102-01 - 0.1.0-e180104-2 0.1.0-e180104-2 4.5.0-preview1-26102-01 4.5.0-preview1-26102-01 diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs new file mode 100644 index 0000000000..f8e486d4ba --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/PipelineReaderExtensions.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + internal static class PipelineReaderExtensions + { + public static async Task CopyToAsync(this IPipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) + { + // TODO: Use bufferSize argument + while (!cancellationToken.IsCancellationRequested) + { + var result = await input.ReadAsync(); + var inputBuffer = result.Buffer; + try + { + if (inputBuffer.IsEmpty && result.IsCompleted) + { + return; + } + + await inputBuffer.CopyToAsync(stream); + } + finally + { + input.Advance(inputBuffer.End); + } + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs new file mode 100644 index 0000000000..94de2c4638 --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamExtensions.cs @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Buffers; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + internal static class StreamExtensions + { + /// + /// Copies the content of a into a . + /// + /// + /// + /// + /// + public static Task CopyToAsync(this Stream stream, IPipeWriter writer, CancellationToken cancellationToken = default) + { + // 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken + return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken); + } + + public static async Task CopyToEndAsync(this Stream stream, IPipeWriter writer, CancellationToken cancellationToken = default) + { + try + { + await stream.CopyToAsync(writer, cancellationToken); + } + catch (Exception ex) + { + writer.Complete(ex); + return; + } + writer.Complete(); + } + + /// + /// Copies a to a asynchronously + /// + /// The to copy + /// The target + /// + public static Task CopyToAsync(this ReadOnlyBuffer buffer, Stream stream) + { + if (buffer.IsSingleSpan) + { + return WriteToStream(stream, buffer.First); + } + + return CopyMultipleToStreamAsync(buffer, stream); + } + + private static async Task CopyMultipleToStreamAsync(this ReadOnlyBuffer buffer, Stream stream) + { + foreach (var memory in buffer) + { + await WriteToStream(stream, memory); + } + } + + private static async Task WriteToStream(Stream stream, ReadOnlyMemory readOnlyMemory) + { + var memory = MemoryMarshal.AsMemory(readOnlyMemory); + if (memory.TryGetArray(out ArraySegment data)) + { + await stream.WriteAsync(data.Array, data.Offset, data.Count) + .ConfigureAwait(continueOnCapturedContext: false); + } + else + { + // Copy required + var array = memory.Span.ToArray(); + await stream.WriteAsync(array, 0, array.Length).ConfigureAwait(continueOnCapturedContext: false); + } + } + + public static Task CopyToEndAsync(this IPipeReader input, Stream stream) + { + return input.CopyToEndAsync(stream, 4096, CancellationToken.None); + } + + public static async Task CopyToEndAsync(this IPipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) + { + try + { + await input.CopyToAsync(stream, bufferSize, cancellationToken); + } + catch (Exception ex) + { + input.Complete(ex); + return; + } + return; + } + + private class PipelineWriterStream : Stream + { + private readonly IPipeWriter _writer; + + public PipelineWriterStream(IPipeWriter writer) + { + _writer = writer; + } + + public override bool CanRead => false; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var output = _writer.Alloc(); + output.Write(new ReadOnlySpan(buffer, offset, count)); + await output.FlushAsync(cancellationToken); + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs new file mode 100644 index 0000000000..c2e212f931 --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/StreamPipeConnection.cs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace System.IO.Pipelines +{ + internal class StreamPipeConnection : IPipeConnection + { + public StreamPipeConnection(PipeOptions options, Stream stream) + { + Input = CreateReader(options, stream); + Output = CreateWriter(options, stream); + } + + public IPipeReader Input { get; } + + public IPipeWriter Output { get; } + + public void Dispose() + { + Input.Complete(); + Output.Complete(); + } + + public static IPipeReader CreateReader(PipeOptions options, Stream stream) + { + if (!stream.CanRead) + { + throw new NotSupportedException(); + } + + var pipe = new Pipe(options); + var ignore = stream.CopyToEndAsync(pipe.Writer); + + return pipe.Reader; + } + + public static IPipeWriter CreateWriter(PipeOptions options, Stream stream) + { + if (!stream.CanWrite) + { + throw new NotSupportedException(); + } + + var pipe = new Pipe(options); + var ignore = pipe.Reader.CopyToEndAsync(stream); + + return pipe.Writer; + } + } +} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj index 023a6f85ba..74c8605cd9 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj @@ -18,7 +18,6 @@ -