// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.IO.Pipelines; using System.IO; using System.Threading; using System.Threading.Tasks; using System.Buffers; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { public class RawStream : Stream { private readonly PipeReader _input; private readonly PipeWriter _output; public RawStream(PipeReader input, PipeWriter output) { _input = input; _output = output; } public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { // ValueTask uses .GetAwaiter().GetResult() if necessary // https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156 return ReadAsyncInternal(new Memory(buffer, offset, count)).Result; } public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return ReadAsyncInternal(new Memory(buffer, offset, count)).AsTask(); } #if NETCOREAPP2_1 public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) { return ReadAsyncInternal(destination); } #endif public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer != null) { _output.Write(new ReadOnlySpan(buffer, offset, count)); } await _output.FlushAsync(cancellationToken); } #if NETCOREAPP2_1 public override async Task WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { _output.Write(source.Span); await _output.FlushAsync(cancellationToken); } #endif public override void Flush() { FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); } public override Task FlushAsync(CancellationToken cancellationToken) { return WriteAsync(null, 0, 0, cancellationToken); } private async ValueTask ReadAsyncInternal(Memory destination) { while (true) { var result = await _input.ReadAsync(); var readableBuffer = result.Buffer; try { if (!readableBuffer.IsEmpty) { // buffer.Count is int var count = (int) Math.Min(readableBuffer.Length, destination.Length); readableBuffer = readableBuffer.Slice(0, count); readableBuffer.CopyTo(destination.Span); return count; } else if (result.IsCompleted) { return 0; } } finally { _input.AdvanceTo(readableBuffer.End, readableBuffer.End); } } } public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { var task = ReadAsync(buffer, offset, count, default(CancellationToken), state); if (callback != null) { task.ContinueWith(t => callback.Invoke(t)); } return task; } public override int EndRead(IAsyncResult asyncResult) { return ((Task)asyncResult).GetAwaiter().GetResult(); } private Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) { var tcs = new TaskCompletionSource(state); var task = ReadAsync(buffer, offset, count, cancellationToken); task.ContinueWith((task2, state2) => { var tcs2 = (TaskCompletionSource)state2; if (task2.IsCanceled) { tcs2.SetCanceled(); } else if (task2.IsFaulted) { tcs2.SetException(task2.Exception); } else { tcs2.SetResult(task2.Result); } }, tcs, cancellationToken); return tcs.Task; } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { var task = WriteAsync(buffer, offset, count, default(CancellationToken), state); if (callback != null) { task.ContinueWith(t => callback.Invoke(t)); } return task; } public override void EndWrite(IAsyncResult asyncResult) { ((Task)asyncResult).GetAwaiter().GetResult(); } private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) { var tcs = new TaskCompletionSource(state); var task = WriteAsync(buffer, offset, count, cancellationToken); task.ContinueWith((task2, state2) => { var tcs2 = (TaskCompletionSource)state2; if (task2.IsCanceled) { tcs2.SetCanceled(); } else if (task2.IsFaulted) { tcs2.SetException(task2.Exception); } else { tcs2.SetResult(null); } }, tcs, cancellationToken); return tcs.Task; } } }