Improve SocketOutputTests
- This should increase reliability/determinism by removing timeouts.
This commit is contained in:
parent
08a91f17bf
commit
8f4cc3003b
|
|
@ -3,13 +3,11 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
|
||||
using Microsoft.AspNetCore.Server.KestrelTests.TestHelpers;
|
||||
using Xunit;
|
||||
|
||||
|
|
@ -18,7 +16,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
public class SocketOutputTests
|
||||
{
|
||||
[Fact]
|
||||
public void CanWrite1MB()
|
||||
public async Task CanWrite1MB()
|
||||
{
|
||||
// This test was added because when initially implementing write-behind buffering in
|
||||
// SocketOutput, the write callback would never be invoked for writes larger than
|
||||
|
|
@ -33,25 +31,19 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
|
||||
|
||||
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
|
||||
var bufferSize = 1048576;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
var completedWh = new ManualResetEventSlim();
|
||||
|
||||
// Act
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(
|
||||
(t) =>
|
||||
{
|
||||
Assert.Null(t.Exception);
|
||||
completedWh.Set();
|
||||
}
|
||||
);
|
||||
var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
// Assert
|
||||
Assert.True(completedWh.Wait(1000));
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = socketOutput.WriteAsync(
|
||||
|
|
@ -60,7 +52,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted()
|
||||
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyBuffered()
|
||||
{
|
||||
// This should match _maxBytesPreCompleted in SocketOutput
|
||||
var maxBytesPreCompleted = 65536;
|
||||
|
|
@ -83,45 +75,43 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var mockConnection = new MockConnection();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
|
||||
|
||||
var bufferSize = maxBytesPreCompleted;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
var completedWh = new ManualResetEventSlim();
|
||||
Action<Task> onCompleted = (Task t) =>
|
||||
{
|
||||
Assert.Null(t.Exception);
|
||||
completedWh.Set();
|
||||
};
|
||||
|
||||
// Act
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted);
|
||||
var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
|
||||
// Assert
|
||||
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
|
||||
Assert.True(completedWh.Wait(1000));
|
||||
// Arrange
|
||||
completedWh.Reset();
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
|
||||
|
||||
// Act
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted);
|
||||
var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
// Assert
|
||||
// Too many bytes are already pre-completed for the second write to pre-complete.
|
||||
Assert.False(completedWh.Wait(1000));
|
||||
Assert.False(writeTask2.IsCompleted);
|
||||
|
||||
// Act
|
||||
Action<int> triggerNextCompleted;
|
||||
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
|
||||
triggerNextCompleted(0);
|
||||
|
||||
// Assert
|
||||
// Finishing the first write should allow the second write to pre-complete.
|
||||
Assert.True(completedWh.Wait(1000));
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = socketOutput.WriteAsync(
|
||||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -131,12 +121,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
|
||||
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered()
|
||||
{
|
||||
// This should match _maxBytesPreCompleted in SocketOutput
|
||||
var maxBytesPreCompleted = 65536;
|
||||
var completeQueue = new ConcurrentQueue<Action<int>>();
|
||||
var writeRequestedWh = new ManualResetEventSlim();
|
||||
var writeRequested = false;
|
||||
|
||||
// Arrange
|
||||
var mockLibuv = new MockLibuv
|
||||
|
|
@ -144,7 +134,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
OnWrite = (socket, buffers, triggerCompleted) =>
|
||||
{
|
||||
completeQueue.Enqueue(triggerCompleted);
|
||||
writeRequestedWh.Set();
|
||||
writeRequested = true;
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
|
@ -156,7 +146,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var mockConnection = new MockConnection();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
|
||||
|
||||
|
|
@ -166,11 +156,13 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
|
||||
// Act
|
||||
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
|
||||
|
||||
// Assert
|
||||
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
|
||||
Assert.True(writeRequestedWh.Wait(1000));
|
||||
writeRequestedWh.Reset();
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeRequested);
|
||||
writeRequested = false;
|
||||
|
||||
// Add more bytes to the write-behind buffer to prevent the next write from
|
||||
var iter = socketOutput.ProducingStart();
|
||||
|
|
@ -179,9 +171,11 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
|
||||
// Act
|
||||
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
|
||||
|
||||
// Assert
|
||||
// Too many bytes are already pre-completed for the fourth write to pre-complete.
|
||||
Assert.True(writeRequestedWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeRequested);
|
||||
Assert.False(writeTask2.IsCompleted);
|
||||
|
||||
// 2 calls have been made to uv_write
|
||||
|
|
@ -194,14 +188,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
|
||||
// Assert
|
||||
// Finishing the first write should allow the second write to pre-complete.
|
||||
Assert.True(writeTask2.Wait(1000));
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = socketOutput.WriteAsync(
|
||||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -234,7 +228,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
|
||||
using (var mockConnection = new MockConnection())
|
||||
{
|
||||
|
|
@ -261,7 +255,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var task3Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken));
|
||||
|
||||
// Give time for tasks to percolate
|
||||
await Task.Delay(1000);
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
// Second task is not completed
|
||||
Assert.False(task2Throw.IsCompleted);
|
||||
|
|
@ -316,7 +310,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
triggerNextCompleted(0);
|
||||
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -350,7 +344,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
|
||||
using (var mockConnection = new MockConnection())
|
||||
{
|
||||
|
|
@ -376,7 +370,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token);
|
||||
|
||||
// Give time for tasks to percolate
|
||||
await Task.Delay(1000);
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
// Second task is not completed
|
||||
Assert.False(task2Success.IsCompleted);
|
||||
|
|
@ -405,7 +399,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -416,12 +410,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void WritesDontGetCompletedTooQuickly()
|
||||
public async Task WritesDontGetCompletedTooQuickly()
|
||||
{
|
||||
// This should match _maxBytesPreCompleted in SocketOutput
|
||||
var maxBytesPreCompleted = 65536;
|
||||
var completeQueue = new ConcurrentQueue<Action<int>>();
|
||||
var onWriteWh = new ManualResetEventSlim();
|
||||
var writeCalled = false;
|
||||
|
||||
// Arrange
|
||||
var mockLibuv = new MockLibuv
|
||||
|
|
@ -429,7 +423,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
OnWrite = (socket, buffers, triggerCompleted) =>
|
||||
{
|
||||
completeQueue.Enqueue(triggerCompleted);
|
||||
onWriteWh.Set();
|
||||
writeCalled = true;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -442,42 +436,30 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var mockConnection = new MockConnection();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
|
||||
|
||||
var bufferSize = maxBytesPreCompleted;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
||||
var completedWh = new ManualResetEventSlim();
|
||||
Action<Task> onCompleted = (Task t) =>
|
||||
{
|
||||
Assert.Null(t.Exception);
|
||||
completedWh.Set();
|
||||
};
|
||||
|
||||
var completedWh2 = new ManualResetEventSlim();
|
||||
Action<Task> onCompleted2 = (Task t) =>
|
||||
{
|
||||
Assert.Null(t.Exception);
|
||||
completedWh2.Set();
|
||||
};
|
||||
|
||||
// Act (Pre-complete the maximum number of bytes in preparation for the rest of the test)
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted);
|
||||
var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
|
||||
// Assert
|
||||
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
|
||||
Assert.True(completedWh.Wait(1000));
|
||||
Assert.True(onWriteWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
|
||||
Assert.True(writeCalled);
|
||||
// Arrange
|
||||
completedWh.Reset();
|
||||
onWriteWh.Reset();
|
||||
writeCalled = false;
|
||||
|
||||
// Act
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted);
|
||||
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted2);
|
||||
var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
var writeTask3 = socketOutput.WriteAsync(buffer, default(CancellationToken));
|
||||
|
||||
Assert.True(onWriteWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeCalled);
|
||||
Action<int> triggerNextCompleted;
|
||||
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
|
||||
triggerNextCompleted(0);
|
||||
|
|
@ -485,23 +467,23 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
// Assert
|
||||
// Too many bytes are already pre-completed for the third but not the second write to pre-complete.
|
||||
// https://github.com/aspnet/KestrelHttpServer/issues/356
|
||||
Assert.True(completedWh.Wait(1000));
|
||||
Assert.False(completedWh2.Wait(1000));
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status);
|
||||
Assert.False(writeTask3.IsCompleted);
|
||||
|
||||
// Act
|
||||
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
|
||||
triggerNextCompleted(0);
|
||||
|
||||
// Assert
|
||||
// Finishing the first write should allow the second write to pre-complete.
|
||||
Assert.True(completedWh2.Wait(1000));
|
||||
// Finishing the first write should allow the third write to pre-complete.
|
||||
Assert.Equal(TaskStatus.RanToCompletion, writeTask3.Status);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = ((SocketOutput)socketOutput).WriteAsync(
|
||||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -511,17 +493,15 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void ProducingStartAndProducingCompleteCanBeUsedDirectly()
|
||||
public async Task ProducingStartAndProducingCompleteCanBeUsedDirectly()
|
||||
{
|
||||
int nBuffers = 0;
|
||||
var nBufferWh = new ManualResetEventSlim();
|
||||
|
||||
var mockLibuv = new MockLibuv
|
||||
{
|
||||
OnWrite = (socket, buffers, triggerCompleted) =>
|
||||
{
|
||||
nBuffers = buffers;
|
||||
nBufferWh.Set();
|
||||
triggerCompleted(0);
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -534,7 +514,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
|
||||
|
||||
// block 1
|
||||
|
|
@ -551,9 +531,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
socketOutput.ProducingComplete(end);
|
||||
|
||||
// A call to Write is required to ensure a write is scheduled
|
||||
socketOutput.WriteAsync(default(ArraySegment<byte>), default(CancellationToken));
|
||||
var ignore = socketOutput.WriteAsync(default(ArraySegment<byte>), default(CancellationToken));
|
||||
|
||||
Assert.True(nBufferWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.Equal(2, nBuffers);
|
||||
|
||||
// Cleanup
|
||||
|
|
@ -563,16 +543,16 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void OnlyAllowsUpToThreeConcurrentWrites()
|
||||
public async Task OnlyAllowsUpToThreeConcurrentWrites()
|
||||
{
|
||||
var writeWh = new ManualResetEventSlim();
|
||||
var writeCalled = false;
|
||||
var completeQueue = new ConcurrentQueue<Action<int>>();
|
||||
|
||||
var mockLibuv = new MockLibuv
|
||||
{
|
||||
OnWrite = (socket, buffers, triggerCompleted) =>
|
||||
{
|
||||
writeWh.Set();
|
||||
writeCalled = true;
|
||||
completeQueue.Enqueue(triggerCompleted);
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -585,43 +565,44 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var mockConnection = new MockConnection();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
|
||||
|
||||
var buffer = new ArraySegment<byte>(new byte[1]);
|
||||
|
||||
// First three writes trigger uv_write
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
Assert.True(writeWh.Wait(1000));
|
||||
writeWh.Reset();
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
Assert.True(writeWh.Wait(1000));
|
||||
writeWh.Reset();
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
Assert.True(writeWh.Wait(1000));
|
||||
writeWh.Reset();
|
||||
var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeCalled);
|
||||
writeCalled = false;
|
||||
ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeCalled);
|
||||
writeCalled = false;
|
||||
ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeCalled);
|
||||
writeCalled = false;
|
||||
|
||||
// The fourth write won't trigger uv_write since the first three haven't completed
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
Assert.False(writeWh.Wait(1000));
|
||||
ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.False(writeCalled);
|
||||
|
||||
// Complete 1st write allowing uv_write to be triggered again
|
||||
Action<int> triggerNextCompleted;
|
||||
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
|
||||
triggerNextCompleted(0);
|
||||
Assert.True(writeWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.True(writeCalled);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = socketOutput.WriteAsync(
|
||||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
// Allow for the socketDisconnect command to get posted to the libuv thread.
|
||||
// Right now, the three pending writes are holding it up.
|
||||
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
|
||||
triggerNextCompleted(0);
|
||||
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
|
||||
Assert.True(mockConnection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
foreach (var triggerCompleted in completeQueue)
|
||||
{
|
||||
|
|
@ -631,9 +612,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void WritesAreAggregated()
|
||||
public async Task WritesAreAggregated()
|
||||
{
|
||||
var writeWh = new ManualResetEventSlim();
|
||||
var writeCalled = false;
|
||||
var writeCount = 0;
|
||||
|
||||
var mockLibuv = new MockLibuv
|
||||
|
|
@ -642,7 +623,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
{
|
||||
writeCount++;
|
||||
triggerCompleted(0);
|
||||
writeWh.Set();
|
||||
writeCalled = true;
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
|
@ -654,32 +635,32 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
|
||||
|
||||
var blockThreadWh = new ManualResetEventSlim();
|
||||
kestrelThread.Post(_ =>
|
||||
{
|
||||
blockThreadWh.Wait();
|
||||
}, state: null);
|
||||
mockLibuv.KestrelThreadBlocker.Reset();
|
||||
|
||||
var buffer = new ArraySegment<byte>(new byte[1]);
|
||||
|
||||
// Two calls to WriteAsync trigger uv_write once if both calls
|
||||
// are made before write is scheduled
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
ignore = socketOutput.WriteAsync(buffer, CancellationToken.None);
|
||||
|
||||
blockThreadWh.Set();
|
||||
mockLibuv.KestrelThreadBlocker.Set();
|
||||
|
||||
Assert.True(writeWh.Wait(1000));
|
||||
writeWh.Reset();
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
Assert.True(writeCalled);
|
||||
writeCalled = false;
|
||||
|
||||
// Write isn't called twice after the thread is unblocked
|
||||
Assert.False(writeWh.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
Assert.False(writeCalled);
|
||||
// One call to ScheduleWrite
|
||||
Assert.Equal(1, mockLibuv.PostCount);
|
||||
// One call to uv_write
|
||||
Assert.Equal(1, writeCount);
|
||||
// One call to ScheduleWrite + One call to Post to block the thread
|
||||
Assert.Equal(2, mockLibuv.PostCount);
|
||||
|
||||
// Cleanup
|
||||
var cleanupTask = socketOutput.WriteAsync(
|
||||
|
|
@ -688,7 +669,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void ProducingStartAndProducingCompleteCanBeCalledAfterConnectionClose()
|
||||
public async Task ProducingStartAndProducingCompleteCanBeCalledAfterConnectionClose()
|
||||
{
|
||||
var mockLibuv = new MockLibuv();
|
||||
|
||||
|
|
@ -699,7 +680,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var kestrelThread = kestrelEngine.Threads[0];
|
||||
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
|
||||
var trace = new KestrelTrace(new TestKestrelTrace());
|
||||
var ltp = new LoggingThreadPool(trace);
|
||||
var ltp = new SynchronousThreadPool();
|
||||
var connection = new MockConnection();
|
||||
var socketOutput = new SocketOutput(kestrelThread, socket, connection, "0", trace, ltp);
|
||||
|
||||
|
|
@ -707,7 +688,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
|
|||
var cleanupTask = socketOutput.WriteAsync(
|
||||
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
|
||||
|
||||
Assert.True(connection.SocketClosed.Wait(1000));
|
||||
await mockLibuv.OnPostTask;
|
||||
|
||||
Assert.Equal(TaskStatus.RanToCompletion, connection.SocketClosed.Status);
|
||||
|
||||
var start = socketOutput.ProducingStart();
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
||||
|
|
@ -11,6 +12,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
|||
{
|
||||
private UvAsyncHandle _postHandle;
|
||||
private uv_async_cb _onPost;
|
||||
private TaskCompletionSource<object> _onPostTcs;
|
||||
private object _postLock = new object();
|
||||
|
||||
private bool _stopLoop;
|
||||
private readonly ManualResetEventSlim _loopWh = new ManualResetEventSlim();
|
||||
|
|
@ -32,8 +35,17 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
|||
|
||||
_uv_async_send = postHandle =>
|
||||
{
|
||||
PostCount++;
|
||||
_loopWh.Set();
|
||||
lock (_postLock)
|
||||
{
|
||||
if (_onPostTcs == null || _onPostTcs.Task.IsCompleted)
|
||||
{
|
||||
_onPostTcs = new TaskCompletionSource<object>();
|
||||
}
|
||||
|
||||
PostCount++;
|
||||
|
||||
_loopWh.Set();
|
||||
}
|
||||
|
||||
return 0;
|
||||
};
|
||||
|
|
@ -51,8 +63,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
|||
while (!_stopLoop)
|
||||
{
|
||||
_loopWh.Wait();
|
||||
_loopWh.Reset();
|
||||
_onPost(_postHandle.InternalGetHandle());
|
||||
KestrelThreadBlocker.Wait();
|
||||
|
||||
lock (_postLock)
|
||||
{
|
||||
_loopWh.Reset();
|
||||
_onPost(_postHandle.InternalGetHandle());
|
||||
_onPostTcs.TrySetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
@ -97,6 +115,10 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
|||
|
||||
public int PostCount { get; set; }
|
||||
|
||||
public Task OnPostTask => _onPostTcs.Task;
|
||||
|
||||
public ManualResetEventSlim KestrelThreadBlocker { get; } = new ManualResetEventSlim(true);
|
||||
|
||||
private int UvReadStart(UvStreamHandle handle, uv_alloc_cb allocCallback, uv_read_cb readCallback)
|
||||
{
|
||||
AllocCallback = allocCallback;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
||||
{
|
||||
public class SynchronousThreadPool : IThreadPool
|
||||
{
|
||||
public void Complete(TaskCompletionSource<object> tcs)
|
||||
{
|
||||
tcs.TrySetResult(null);
|
||||
}
|
||||
|
||||
public void Cancel(TaskCompletionSource<object> tcs)
|
||||
{
|
||||
tcs.TrySetCanceled();
|
||||
}
|
||||
|
||||
public void Error(TaskCompletionSource<object> tcs, Exception ex)
|
||||
{
|
||||
tcs.TrySetException(ex);
|
||||
}
|
||||
|
||||
public void Run(Action action)
|
||||
{
|
||||
action();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue