Fix ConcurrentPipeWriterTests (#12383)
This commit is contained in:
parent
c54a7209d7
commit
34903da1e9
|
|
@ -25,6 +25,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
};
|
||||
|
||||
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
|
||||
|
||||
// No need to pass in a real sync object since all the calls in this test are passthrough.
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
|
||||
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
|
|
@ -71,24 +73,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
};
|
||||
|
||||
var sync = new object();
|
||||
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
|
||||
var flushTask0 = default(ValueTask<FlushResult>);
|
||||
var flushTask1 = default(ValueTask<FlushResult>);
|
||||
var completeTask = default(ValueTask);
|
||||
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
lock (sync)
|
||||
{
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
|
||||
var flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
var flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
}
|
||||
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
|
|
@ -99,11 +108,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
pipeWriterFlushTcsArray[0].SetResult(default);
|
||||
|
||||
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
|
||||
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
lock (sync)
|
||||
{
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
}
|
||||
|
||||
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
|
||||
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
|
||||
|
|
@ -115,6 +128,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
pipeWriterFlushTcsArray[1].SetResult(default);
|
||||
|
||||
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
|
||||
|
||||
// Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times.
|
||||
|
|
@ -126,7 +140,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.False(flushTask1.IsCompleted);
|
||||
|
||||
var completeEx = new Exception();
|
||||
await concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
lock (sync)
|
||||
{
|
||||
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
}
|
||||
|
||||
await completeTask.DefaultTimeout();
|
||||
|
||||
// Complete isn't called on the inner PipeWriter until the inner flushes have completed.
|
||||
Assert.Null(mockPipeWriter.CompleteException);
|
||||
|
|
@ -151,21 +170,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
};
|
||||
|
||||
var sync = new object();
|
||||
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
|
||||
var memory = default(Memory<byte>);
|
||||
var flushTask0 = default(ValueTask<FlushResult>);
|
||||
var flushTask1 = default(ValueTask<FlushResult>);
|
||||
var completeTask = default(ValueTask);
|
||||
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
lock (sync)
|
||||
{
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
|
||||
var flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
}
|
||||
|
||||
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
|
||||
// flush completes, and the next flush will pick up the buffered data.
|
||||
|
|
@ -177,11 +204,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
lock (sync)
|
||||
{
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
|
||||
var flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
}
|
||||
|
||||
// Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed.
|
||||
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
|
||||
|
|
@ -201,7 +231,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(2, mockPipeWriter.FlushCallCount);
|
||||
|
||||
var completeEx = new Exception();
|
||||
await concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
|
||||
lock (sync)
|
||||
{
|
||||
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
}
|
||||
|
||||
await completeTask.DefaultTimeout();
|
||||
|
||||
Assert.Same(completeEx, mockPipeWriter.CompleteException);
|
||||
}
|
||||
}
|
||||
|
|
@ -217,21 +254,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
};
|
||||
|
||||
var sync = new object();
|
||||
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
|
||||
var memory = default(Memory<byte>);
|
||||
var flushTask0 = default(ValueTask<FlushResult>);
|
||||
var completeTask = default(ValueTask);
|
||||
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
lock (sync)
|
||||
{
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
|
||||
var flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
}
|
||||
|
||||
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
|
||||
// flush completes, and the next flush will pick up the buffered data.
|
||||
|
|
@ -243,13 +287,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
|
||||
// Complete the ConcurrentPipeWriter without flushing any of the queued data.
|
||||
var completeEx = new Exception();
|
||||
await concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
|
||||
lock (sync)
|
||||
{
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
|
||||
// Complete the ConcurrentPipeWriter without flushing any of the queued data.
|
||||
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
}
|
||||
|
||||
await completeTask.DefaultTimeout();
|
||||
|
||||
// Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed.
|
||||
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
|
||||
|
|
@ -272,45 +322,58 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
};
|
||||
|
||||
var sync = new object();
|
||||
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
|
||||
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
|
||||
var flushTask0 = default(ValueTask<FlushResult>);
|
||||
var flushTask1 = default(ValueTask<FlushResult>);
|
||||
var flushTask2 = default(ValueTask<FlushResult>);
|
||||
var completeTask = default(ValueTask);
|
||||
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
lock (sync)
|
||||
{
|
||||
var memory = concurrentPipeWriter.GetMemory();
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
|
||||
var flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
flushTask0 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
var flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
// Since the flush was not awaited, the following API calls are queued.
|
||||
memory = concurrentPipeWriter.GetMemory();
|
||||
concurrentPipeWriter.Advance(memory.Length);
|
||||
flushTask1 = concurrentPipeWriter.FlushAsync();
|
||||
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
Assert.False(flushTask1.IsCompleted);
|
||||
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
|
||||
Assert.Equal(1, mockPipeWriter.FlushCallCount);
|
||||
|
||||
// CancelPendingFlush() does not get queued.
|
||||
concurrentPipeWriter.CancelPendingFlush();
|
||||
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
Assert.False(flushTask1.IsCompleted);
|
||||
|
||||
// CancelPendingFlush() does not get queued.
|
||||
concurrentPipeWriter.CancelPendingFlush();
|
||||
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
|
||||
}
|
||||
|
||||
pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false));
|
||||
|
||||
Assert.True((await flushTask0.DefaultTimeout()).IsCanceled);
|
||||
Assert.True((await flushTask1.DefaultTimeout()).IsCanceled);
|
||||
|
||||
var flushTask2 = concurrentPipeWriter.FlushAsync();
|
||||
lock (sync)
|
||||
{
|
||||
flushTask2 = concurrentPipeWriter.FlushAsync();
|
||||
}
|
||||
|
||||
Assert.False(flushTask2.IsCompleted);
|
||||
|
||||
pipeWriterFlushTcsArray[1].SetResult(default);
|
||||
|
||||
await flushTask2.DefaultTimeout();
|
||||
|
||||
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
|
||||
|
|
@ -319,7 +382,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(2, mockPipeWriter.FlushCallCount);
|
||||
|
||||
var completeEx = new Exception();
|
||||
await concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
|
||||
lock (sync)
|
||||
{
|
||||
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
|
||||
}
|
||||
|
||||
await completeTask.DefaultTimeout();
|
||||
|
||||
Assert.Same(completeEx, mockPipeWriter.CompleteException);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue