Prevent enumeration of queue during modification in SocketOutputTests

This commit is contained in:
Stephen Halter 2016-07-14 15:12:47 -07:00
parent 0c52529b97
commit fbcb5dcb1b
1 changed files with 60 additions and 19 deletions

View File

@ -2,10 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
@ -87,7 +87,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -111,7 +112,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
// Too many bytes are already pre-completed for the second write to pre-complete.
Assert.False(completedWh.Wait(1000));
// Act
completeQueue.Dequeue()(0);
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Assert
// Finishing the first write should allow the second write to pre-complete.
Assert.True(completedWh.Wait(1000));
@ -120,6 +123,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
@ -132,7 +138,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
var writeRequestedWh = new ManualResetEventSlim();
// Arrange
@ -155,7 +161,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted / 2;
var data = new byte[bufferSize];
@ -185,7 +192,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
Assert.Equal(2, completeQueue.Count);
// Act
completeQueue.Dequeue()(0);
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Assert
// Finishing the first write should allow the second write to pre-complete.
@ -195,6 +204,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
@ -207,7 +219,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
@ -298,12 +310,19 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
Assert.False(task6Success.IsCanceled);
Assert.False(task6Success.IsFaulted);
Assert.True(true);
// Cleanup
var cleanupTask = ((SocketOutput)socketOutput).WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Allow for the socketDisconnect command to get posted to the libuv thread.
// Right now, the up to three pending writes are holding it up.
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
@ -317,7 +336,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
@ -376,7 +395,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
Assert.False(task3Canceled.IsFaulted);
// Cause the first write to fail.
completeQueue.Dequeue()(-1);
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(-1);
// Second task is now completed
await task2Success;
@ -389,6 +410,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var cleanupTask = ((SocketOutput)socketOutput).WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
@ -402,7 +426,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
var onWriteWh = new ManualResetEventSlim();
// Arrange
@ -426,7 +450,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -460,7 +485,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
socketOutput.WriteAsync(buffer, default(CancellationToken)).ContinueWith(onCompleted2);
Assert.True(onWriteWh.Wait(1000));
completeQueue.Dequeue()(0);
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Assert
// Too many bytes are already pre-completed for the third but not the second write to pre-complete.
@ -469,7 +496,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
Assert.False(completedWh2.Wait(1000));
// Act
completeQueue.Dequeue()(0);
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Assert
// Finishing the first write should allow the second write to pre-complete.
@ -479,6 +507,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var cleanupTask = ((SocketOutput)socketOutput).WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
@ -543,7 +574,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
public void OnlyAllowsUpToThreeConcurrentWrites()
{
var writeWh = new ManualResetEventSlim();
var completeQueue = new Queue<Action<int>>();
var completeQueue = new ConcurrentQueue<Action<int>>();
var mockLibuv = new MockLibuv
{
@ -564,7 +595,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var mockConnection = new MockConnection();
var socketOutput = new SocketOutput(kestrelThread, socket, memory, mockConnection, "0", trace, ltp, new Queue<UvWriteReq>());
var buffer = new ArraySegment<byte>(new byte[1]);
@ -584,13 +616,22 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
Assert.False(writeWh.Wait(1000));
// Complete 1st write allowing uv_write to be triggered again
completeQueue.Dequeue()(0);
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
Assert.True(writeWh.Wait(1000));
// Cleanup
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Allow for the socketDisconnect command to get posted to the libuv thread.
// Right now, the three pending writes are holding it up.
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
Assert.True(mockConnection.SocketClosed.Wait(1000));
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);