// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { public class OutputProducer : IDisposable { private static readonly ArraySegment _emptyData = new ArraySegment(new byte[0]); private readonly string _connectionId; private readonly ITimeoutControl _timeoutControl; private readonly IKestrelTrace _log; // This locks access to to all of the below fields private readonly object _contextLock = new object(); private bool _completed = false; private readonly IPipe _pipe; // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush // this is temporary until it does private TaskCompletionSource _flushTcs; private readonly object _flushLock = new object(); private Action _flushCompleted; public OutputProducer( IPipe pipe, string connectionId, IKestrelTrace log, ITimeoutControl timeoutControl) { _pipe = pipe; _connectionId = connectionId; _timeoutControl = timeoutControl; _log = log; _flushCompleted = OnFlushCompleted; } public Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)) { if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return WriteAsync(buffer, cancellationToken, chunk); } public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { return WriteAsync(_emptyData, cancellationToken); } public void Write(Action callback, T state) { lock (_contextLock) { if (_completed) { return; } var buffer = _pipe.Writer.Alloc(1); callback(buffer, state); buffer.Commit(); } } public void Dispose() { lock (_contextLock) { if (_completed) { return; } _log.ConnectionDisconnect(_connectionId); _completed = true; _pipe.Writer.Complete(); } } public void Abort(Exception error) { lock (_contextLock) { if (_completed) { return; } _log.ConnectionDisconnect(_connectionId); _completed = true; _pipe.Reader.CancelPendingRead(); _pipe.Writer.Complete(error); } } private Task WriteAsync( ArraySegment buffer, CancellationToken cancellationToken, bool chunk = false) { var writableBuffer = default(WritableBuffer); lock (_contextLock) { if (_completed) { return Task.CompletedTask; } writableBuffer = _pipe.Writer.Alloc(1); var writer = new WritableBufferWriter(writableBuffer); if (buffer.Count > 0) { if (chunk) { ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Count); } writer.Write(buffer.Array, buffer.Offset, buffer.Count); if (chunk) { ChunkWriter.WriteEndChunkBytes(ref writer); } } writableBuffer.Commit(); } return FlushAsync(writableBuffer, cancellationToken); } private Task FlushAsync(WritableBuffer writableBuffer, CancellationToken cancellationToken) { var awaitable = writableBuffer.FlushAsync(cancellationToken); if (awaitable.IsCompleted) { // The flush task can't fail today return Task.CompletedTask; } return FlushAsyncAwaited(awaitable, writableBuffer.BytesWritten, cancellationToken); } private async Task FlushAsyncAwaited(WritableBufferAwaitable awaitable, long count, CancellationToken cancellationToken) { // https://github.com/dotnet/corefxlab/issues/1334 // Since the flush awaitable doesn't currently support multiple awaiters // we need to use a task to track the callbacks. // All awaiters get the same task lock (_flushLock) { if (_flushTcs == null || _flushTcs.Task.IsCompleted) { _flushTcs = new TaskCompletionSource(); awaitable.OnCompleted(_flushCompleted); } } _timeoutControl.StartTimingWrite(count); await _flushTcs.Task; _timeoutControl.StopTimingWrite(); cancellationToken.ThrowIfCancellationRequested(); } private void OnFlushCompleted() { _flushTcs.TrySetResult(null); } } }