Fix LibuvOutputConsumerTests (#1757)

* Fix LibuvOutputConsumerTests
This commit is contained in:
Pavel Krymets 2017-04-24 17:37:30 -07:00 committed by David Fowler
parent 9e80fb65bd
commit e3e78bc461
1 changed files with 261 additions and 249 deletions

View File

@ -248,291 +248,303 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
await Task.Run(async () =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var completeQueue = new ConcurrentQueue<Action<int>>();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize / 2;
var data = new byte[bufferSize];
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
// Assert
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
await _mockLibuv.OnPostTask;
Assert.NotEmpty(completeQueue);
// Add more bytes to the write-behind buffer to prevent the next write from
((ISocketOutput)socketOutput).Write((writableBuffer, state) =>
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
writableBuffer.Write(state);
},
halfWriteBehindBuffer);
completeQueue.Enqueue(triggerCompleted);
return 0;
};
// Act
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
Assert.False(writeTask2.IsCompleted);
var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
Assert.False(writeTask3.IsCompleted);
// Drain the write queue
while (completeQueue.TryDequeue(out var triggerNextCompleted))
var pipeOptions = new PipeOptions
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize / 2;
var data = new byte[bufferSize];
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
// Assert
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
await _mockLibuv.OnPostTask;
Assert.NotEmpty(completeQueue);
// Add more bytes to the write-behind buffer to prevent the next write from
((ISocketOutput) socketOutput).Write((writableBuffer, state) =>
{
writableBuffer.Write(state);
},
halfWriteBehindBuffer);
// Act
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
Assert.False(writeTask2.IsCompleted);
var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
Assert.False(writeTask3.IsCompleted);
// Drain the write queue
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
var timeout = TimeSpan.FromSeconds(5);
await writeTask2.TimeoutAfter(timeout);
await writeTask3.TimeoutAfter(timeout);
Assert.Empty(completeQueue);
}
var timeout = TimeSpan.FromSeconds(5);
await writeTask2.TimeoutAfter(timeout);
await writeTask3.TimeoutAfter(timeout);
Assert.Empty(completeQueue);
}
});
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task FailedWriteCompletesOrCancelsAllPendingTasks(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
await Task.Run(async () =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var completeQueue = new ConcurrentQueue<Action<int>>();
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource))
{
var bufferSize = maxResponseBufferSize - 1;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
// First task is completed and successful
Assert.True(task1Success.IsCompleted);
Assert.False(task1Success.IsCanceled);
Assert.False(task1Success.IsFaulted);
// following tasks should wait.
var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Second task is not completed
Assert.False(task2Success.IsCompleted);
Assert.False(task2Success.IsCanceled);
Assert.False(task2Success.IsFaulted);
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
// Cause all writes to fail
while (completeQueue.TryDequeue(out var triggerNextCompleted))
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted);
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource))
{
var bufferSize = maxResponseBufferSize - 1;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
// First task is completed and successful
Assert.True(task1Success.IsCompleted);
Assert.False(task1Success.IsCanceled);
Assert.False(task1Success.IsFaulted);
// following tasks should wait.
var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Second task is not completed
Assert.False(task2Success.IsCompleted);
Assert.False(task2Success.IsCanceled);
Assert.False(task2Success.IsFaulted);
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
// Cause all writes to fail
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted);
}
// Second task is now completed
Assert.True(task2Success.IsCompleted);
Assert.False(task2Success.IsCanceled);
Assert.False(task2Success.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
// Second task is now completed
Assert.True(task2Success.IsCompleted);
Assert.False(task2Success.IsCanceled);
Assert.False(task2Success.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
});
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task CancelsBeforeWriteRequestCompletes(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
await Task.Run(async () =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var completeQueue = new ConcurrentQueue<Action<int>>();
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize - 1;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
// First task is completed and successful
Assert.True(task1Success.IsCompleted);
Assert.False(task1Success.IsCanceled);
Assert.False(task1Success.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize - 1;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// task1 should complete successfully as < _maxBytesPreCompleted
// First task is completed and successful
Assert.True(task1Success.IsCompleted);
Assert.False(task1Success.IsCanceled);
Assert.False(task1Success.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
Assert.True(abortedSource.IsCancellationRequested);
}
});
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WriteAsyncWithTokenAfterCallWithoutIsCancelled(int maxResponseBufferSize)
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
await Task.Run(async () =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var completeQueue = new ConcurrentQueue<Action<int>>();
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
// First task is not completed
Assert.False(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
// Arrange
_mockLibuv.OnWrite = (socket, buffers, triggerCompleted) =>
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
completeQueue.Enqueue(triggerCompleted);
return 0;
};
var abortedSource = new CancellationTokenSource();
var pipeOptions = new PipeOptions
{
ReaderScheduler = _libuvThread,
MaximumSizeHigh = maxResponseBufferSize,
MaximumSizeLow = maxResponseBufferSize,
};
using (var socketOutput = CreateOutputProducer(pipeOptions))
{
var bufferSize = maxResponseBufferSize;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
// First task is not completed
Assert.False(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// following tasks should wait.
var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
abortedSource.Cancel();
// Complete writes
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
// First task is completed
Assert.True(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
}
// First task is completed
Assert.True(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken));
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
}
});
}
[Theory]