From e3e78bc461683f8ec485808ffc01ea091dbc5b54 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 24 Apr 2017 17:37:30 -0700 Subject: [PATCH] Fix LibuvOutputConsumerTests (#1757) * Fix LibuvOutputConsumerTests --- .../LibuvOutputConsumerTests.cs | 510 +++++++++--------- 1 file changed, 261 insertions(+), 249 deletions(-) diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index eb99d26fb2..3d8fa5620c 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -248,291 +248,303 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered(int maxResponseBufferSize) { - var completeQueue = new ConcurrentQueue>(); - - // Arrange - _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + await Task.Run(async () => { - completeQueue.Enqueue(triggerCompleted); - return 0; - }; + var completeQueue = new ConcurrentQueue>(); - var pipeOptions = new PipeOptions - { - ReaderScheduler = _libuvThread, - MaximumSizeHigh = maxResponseBufferSize, - MaximumSizeLow = maxResponseBufferSize, - }; - - using (var socketOutput = CreateOutputProducer(pipeOptions)) - { - var bufferSize = maxResponseBufferSize / 2; - var data = new byte[bufferSize]; - var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); - - // Act - var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); - - // Assert - // The first write should pre-complete since it is <= _maxBytesPreCompleted. - Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); - await _mockLibuv.OnPostTask; - Assert.NotEmpty(completeQueue); - - // Add more bytes to the write-behind buffer to prevent the next write from - ((ISocketOutput)socketOutput).Write((writableBuffer, state) => + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => { - writableBuffer.Write(state); - }, - halfWriteBehindBuffer); + completeQueue.Enqueue(triggerCompleted); + return 0; + }; - // Act - var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); - Assert.False(writeTask2.IsCompleted); - - var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); - Assert.False(writeTask3.IsCompleted); - - // Drain the write queue - while (completeQueue.TryDequeue(out var triggerNextCompleted)) + var pipeOptions = new PipeOptions { - await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions)) + { + var bufferSize = maxResponseBufferSize / 2; + var data = new byte[bufferSize]; + var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + + // Assert + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); + await _mockLibuv.OnPostTask; + Assert.NotEmpty(completeQueue); + + // Add more bytes to the write-behind buffer to prevent the next write from + ((ISocketOutput) socketOutput).Write((writableBuffer, state) => + { + writableBuffer.Write(state); + }, + halfWriteBehindBuffer); + + // Act + var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + Assert.False(writeTask2.IsCompleted); + + var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + Assert.False(writeTask3.IsCompleted); + + // Drain the write queue + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } + + var timeout = TimeSpan.FromSeconds(5); + + await writeTask2.TimeoutAfter(timeout); + await writeTask3.TimeoutAfter(timeout); + + Assert.Empty(completeQueue); } - - var timeout = TimeSpan.FromSeconds(5); - - await writeTask2.TimeoutAfter(timeout); - await writeTask3.TimeoutAfter(timeout); - - Assert.Empty(completeQueue); - } + }); } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task FailedWriteCompletesOrCancelsAllPendingTasks(int maxResponseBufferSize) { - var completeQueue = new ConcurrentQueue>(); - - // Arrange - _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + await Task.Run(async () => { - completeQueue.Enqueue(triggerCompleted); - return 0; - }; + var completeQueue = new ConcurrentQueue>(); - var abortedSource = new CancellationTokenSource(); - - var pipeOptions = new PipeOptions - { - ReaderScheduler = _libuvThread, - MaximumSizeHigh = maxResponseBufferSize, - MaximumSizeLow = maxResponseBufferSize, - }; - - using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource)) - { - var bufferSize = maxResponseBufferSize - 1; - - var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - - // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); - // task1 should complete successfully as < _maxBytesPreCompleted - - // First task is completed and successful - Assert.True(task1Success.IsCompleted); - Assert.False(task1Success.IsCanceled); - Assert.False(task1Success.IsFaulted); - - // following tasks should wait. - var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); - - // Give time for tasks to percolate - await _mockLibuv.OnPostTask; - - // Second task is not completed - Assert.False(task2Success.IsCompleted); - Assert.False(task2Success.IsCanceled); - Assert.False(task2Success.IsFaulted); - - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); - - // Cause all writes to fail - while (completeQueue.TryDequeue(out var triggerNextCompleted)) + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => { - await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted); + completeQueue.Enqueue(triggerCompleted); + return 0; + }; + + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions + { + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource)) + { + var bufferSize = maxResponseBufferSize - 1; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + // task1 should complete successfully as < _maxBytesPreCompleted + + // First task is completed and successful + Assert.True(task1Success.IsCompleted); + Assert.False(task1Success.IsCanceled); + Assert.False(task1Success.IsFaulted); + + // following tasks should wait. + var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Second task is not completed + Assert.False(task2Success.IsCompleted); + Assert.False(task2Success.IsCanceled); + Assert.False(task2Success.IsFaulted); + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + // Cause all writes to fail + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted); + } + + // Second task is now completed + Assert.True(task2Success.IsCompleted); + Assert.False(task2Success.IsCanceled); + Assert.False(task2Success.IsFaulted); + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); + + Assert.True(abortedSource.IsCancellationRequested); } - - // Second task is now completed - Assert.True(task2Success.IsCompleted); - Assert.False(task2Success.IsCanceled); - Assert.False(task2Success.IsFaulted); - - // A final write guarantees that the error is observed by OutputProducer, - // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - Assert.True(task4Success.IsCompleted); - Assert.False(task4Success.IsCanceled); - Assert.False(task4Success.IsFaulted); - - // Third task is now canceled - await Assert.ThrowsAsync(() => task3Canceled); - Assert.True(task3Canceled.IsCanceled); - - Assert.True(abortedSource.IsCancellationRequested); - } + }); } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task CancelsBeforeWriteRequestCompletes(int maxResponseBufferSize) { - var completeQueue = new ConcurrentQueue>(); - - // Arrange - _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + await Task.Run(async () => { - completeQueue.Enqueue(triggerCompleted); - return 0; - }; + var completeQueue = new ConcurrentQueue>(); - var abortedSource = new CancellationTokenSource(); - - var pipeOptions = new PipeOptions - { - ReaderScheduler = _libuvThread, - MaximumSizeHigh = maxResponseBufferSize, - MaximumSizeLow = maxResponseBufferSize, - }; - - using (var socketOutput = CreateOutputProducer(pipeOptions)) - { - var bufferSize = maxResponseBufferSize - 1; - - var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - - // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); - // task1 should complete successfully as < _maxBytesPreCompleted - - // First task is completed and successful - Assert.True(task1Success.IsCompleted); - Assert.False(task1Success.IsCanceled); - Assert.False(task1Success.IsFaulted); - - // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); - - // Give time for tasks to percolate - await _mockLibuv.OnPostTask; - - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); - - abortedSource.Cancel(); - - // Complete writes - while (completeQueue.TryDequeue(out var triggerNextCompleted)) + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => { - await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + completeQueue.Enqueue(triggerCompleted); + return 0; + }; + + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions + { + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions)) + { + var bufferSize = maxResponseBufferSize - 1; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + // task1 should complete successfully as < _maxBytesPreCompleted + + // First task is completed and successful + Assert.True(task1Success.IsCompleted); + Assert.False(task1Success.IsCanceled); + Assert.False(task1Success.IsFaulted); + + // following tasks should wait. + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + abortedSource.Cancel(); + + // Complete writes + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); + + Assert.True(abortedSource.IsCancellationRequested); } - - // A final write guarantees that the error is observed by OutputProducer, - // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - Assert.True(task4Success.IsCompleted); - Assert.False(task4Success.IsCanceled); - Assert.False(task4Success.IsFaulted); - - // Third task is now canceled - await Assert.ThrowsAsync(() => task3Canceled); - Assert.True(task3Canceled.IsCanceled); - - Assert.True(abortedSource.IsCancellationRequested); - } + }); } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WriteAsyncWithTokenAfterCallWithoutIsCancelled(int maxResponseBufferSize) { - var completeQueue = new ConcurrentQueue>(); - - // Arrange - _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => + await Task.Run(async () => { - completeQueue.Enqueue(triggerCompleted); - return 0; - }; + var completeQueue = new ConcurrentQueue>(); - var abortedSource = new CancellationTokenSource(); - - var pipeOptions = new PipeOptions - { - ReaderScheduler = _libuvThread, - MaximumSizeHigh = maxResponseBufferSize, - MaximumSizeLow = maxResponseBufferSize, - }; - - using (var socketOutput = CreateOutputProducer(pipeOptions)) - { - var bufferSize = maxResponseBufferSize; - - var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - - // Act - var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - - // First task is not completed - Assert.False(task1Waits.IsCompleted); - Assert.False(task1Waits.IsCanceled); - Assert.False(task1Waits.IsFaulted); - - // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); - - // Give time for tasks to percolate - await _mockLibuv.OnPostTask; - - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); - - abortedSource.Cancel(); - - // Complete writes - while (completeQueue.TryDequeue(out var triggerNextCompleted)) + // Arrange + _mockLibuv.OnWrite = (socket, buffers, triggerCompleted) => { - await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + completeQueue.Enqueue(triggerCompleted); + return 0; + }; + + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions + { + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; + + using (var socketOutput = CreateOutputProducer(pipeOptions)) + { + var bufferSize = maxResponseBufferSize; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + + // First task is not completed + Assert.False(task1Waits.IsCompleted); + Assert.False(task1Waits.IsCanceled); + Assert.False(task1Waits.IsFaulted); + + // following tasks should wait. + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + abortedSource.Cancel(); + + // Complete writes + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } + + // First task is completed + Assert.True(task1Waits.IsCompleted); + Assert.False(task1Waits.IsCanceled); + Assert.False(task1Waits.IsFaulted); + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + await Assert.ThrowsAsync(() => task3Canceled); + Assert.True(task3Canceled.IsCanceled); } - - // First task is completed - Assert.True(task1Waits.IsCompleted); - Assert.False(task1Waits.IsCanceled); - Assert.False(task1Waits.IsFaulted); - - // A final write guarantees that the error is observed by OutputProducer, - // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - Assert.True(task4Success.IsCompleted); - Assert.False(task4Success.IsCanceled); - Assert.False(task4Success.IsFaulted); - - // Third task is now canceled - await Assert.ThrowsAsync(() => task3Canceled); - Assert.True(task3Canceled.IsCanceled); - } + }); } [Theory]