// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel; using Microsoft.AspNetCore.Server.Kestrel.Internal; using Microsoft.AspNetCore.Server.Kestrel.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure; using Microsoft.AspNetCore.Server.KestrelTests.TestHelpers; using Microsoft.AspNetCore.Testing; using Xunit; namespace Microsoft.AspNetCore.Server.KestrelTests { public class SocketOutputTests { public static TheoryData MaxResponseBufferSizeData => new TheoryData { new KestrelServerOptions(), new KestrelServerOptions { Limits = { MaxResponseBufferSize = 0 } }, new KestrelServerOptions { Limits = { MaxResponseBufferSize = 1024 } }, new KestrelServerOptions { Limits = { MaxResponseBufferSize = 1024 * 1024 } }, new KestrelServerOptions { Limits = { MaxResponseBufferSize = null } }, }; public static TheoryData PositiveMaxResponseBufferSizeData => new TheoryData { new KestrelServerOptions(), new KestrelServerOptions { Limits = { MaxResponseBufferSize = 1024 } }, new KestrelServerOptions { Limits = { MaxResponseBufferSize = 1024 * 1024 } } }; [Theory] [MemberData(nameof(MaxResponseBufferSizeData))] public async Task CanWrite1MB(KestrelServerOptions options) { // This test was added because when initially implementing write-behind buffering in // SocketOutput, the write callback would never be invoked for writes larger than // _maxBytesPreCompleted even after the write actually completed. // Arrange var mockLibuv = new MockLibuv(); using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp); // At least one run of this test should have a MaxResponseBufferSize < 1 MB. var bufferSize = 1024 * 1024; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); await mockLibuv.OnPostTask; // Assert Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); } } [Fact] public async Task NullMaxResponseBufferSizeAllowsUnlimitedBuffer() { var completeQueue = new ConcurrentQueue>(); // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var options = new KestrelServerOptions { Limits = { MaxResponseBufferSize = null } }; var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp); // Don't want to allocate anything too huge for perf. This is at least larger than the default buffer. var bufferSize = 1024 * 1024; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); // Assert Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Fact] public async Task ZeroMaxResponseBufferSizeDisablesBuffering() { var completeQueue = new ConcurrentQueue>(); // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var options = new KestrelServerOptions { Limits = { MaxResponseBufferSize = 0 } }; var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp); var bufferSize = 1; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); // Assert Assert.False(writeTask.IsCompleted); // Act await mockLibuv.OnPostTask; // Finishing the write should allow the task to complete. Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Assert Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyBuffered(KestrelServerOptions options) { var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value; var completeQueue = new ConcurrentQueue>(); // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var mockConnection = new MockConnection(options); var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); // Act var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); await mockLibuv.OnPostTask; // Assert // Too many bytes are already pre-completed for the second write to pre-complete. Assert.False(writeTask2.IsCompleted); // Act Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Assert // Finishing the first write should allow the second write to pre-complete. Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered(KestrelServerOptions options) { var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value; var completeQueue = new ConcurrentQueue>(); var writeRequested = false; // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); writeRequested = true; return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var mockConnection = new MockConnection(options); var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var bufferSize = maxBytesPreCompleted / 2; var data = new byte[bufferSize]; var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); // Act var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); await mockLibuv.OnPostTask; Assert.True(writeRequested); writeRequested = false; // Add more bytes to the write-behind buffer to prevent the next write from var iter = socketOutput.ProducingStart(); iter.CopyFrom(halfWriteBehindBuffer); socketOutput.ProducingComplete(iter); // Act var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); // Assert // Too many bytes are already pre-completed for the fourth write to pre-complete. await mockLibuv.OnPostTask; Assert.True(writeRequested); Assert.False(writeTask2.IsCompleted); // 2 calls have been made to uv_write Assert.Equal(2, completeQueue.Count); // Act Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Assert // Finishing the first write should allow the second write to pre-complete. Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task OnlyWritesRequestingCancellationAreErroredOnCancellation(KestrelServerOptions options) { var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value; var completeQueue = new ConcurrentQueue>(); // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); using (var mockConnection = new MockConnection(options)) { ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var bufferSize = maxBytesPreCompleted; var data = new byte[bufferSize]; var fullBuffer = new ArraySegment(data, 0, bufferSize); var cts = new CancellationTokenSource(); // Act var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: cts.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful Assert.True(task1Success.IsCompleted); Assert.False(task1Success.IsCanceled); Assert.False(task1Success.IsFaulted); // following tasks should wait. var task2Throw = socketOutput.WriteAsync(fullBuffer, cancellationToken: cts.Token); var task3Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); // Give time for tasks to percolate await mockLibuv.OnPostTask; // Second task is not completed Assert.False(task2Throw.IsCompleted); Assert.False(task2Throw.IsCanceled); Assert.False(task2Throw.IsFaulted); // Third task is not completed Assert.False(task3Success.IsCompleted); Assert.False(task3Success.IsCanceled); Assert.False(task3Success.IsFaulted); cts.Cancel(); // Second task is now canceled await Assert.ThrowsAsync(() => task2Throw); Assert.True(task2Throw.IsCanceled); // Third task is now completed await task3Success; // Fourth task immediately cancels as the token is canceled var task4Throw = socketOutput.WriteAsync(fullBuffer, cancellationToken: cts.Token); Assert.True(task4Throw.IsCompleted); Assert.True(task4Throw.IsCanceled); Assert.False(task4Throw.IsFaulted); var task5Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); // task5 should complete immediately Assert.True(task5Success.IsCompleted); Assert.False(task5Success.IsCanceled); Assert.False(task5Success.IsFaulted); cts = new CancellationTokenSource(); var task6Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: cts.Token); // task6 should complete immediately but not cancel as its cancellation token isn't set Assert.True(task6Success.IsCompleted); Assert.False(task6Success.IsCanceled); Assert.False(task6Success.IsFaulted); // Cleanup var cleanupTask = ((SocketOutput)socketOutput).WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Allow for the socketDisconnect command to get posted to the libuv thread. // Right now, the up to three pending writes are holding it up. Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task FailedWriteCompletesOrCancelsAllPendingTasks(KestrelServerOptions options) { var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value; var completeQueue = new ConcurrentQueue>(); // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); using (var mockConnection = new MockConnection(options)) { var abortedSource = mockConnection.RequestAbortedSource; ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var bufferSize = maxBytesPreCompleted; var data = new byte[bufferSize]; var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful Assert.True(task1Success.IsCompleted); Assert.False(task1Success.IsCanceled); Assert.False(task1Success.IsFaulted); // following tasks should wait. var task2Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await mockLibuv.OnPostTask; // Second task is not completed Assert.False(task2Success.IsCompleted); Assert.False(task2Success.IsCanceled); Assert.False(task2Success.IsFaulted); // Third task is not completed Assert.False(task3Canceled.IsCompleted); Assert.False(task3Canceled.IsCanceled); Assert.False(task3Canceled.IsFaulted); // Cause the first write to fail. Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(-1); // Second task is now completed await task2Success; // Third task is now canceled await Assert.ThrowsAsync(() => task3Canceled); Assert.True(task3Canceled.IsCanceled); // Cleanup var cleanupTask = ((SocketOutput)socketOutput).WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } } [Theory] [MemberData(nameof(PositiveMaxResponseBufferSizeData))] public async Task WritesDontGetCompletedTooQuickly(KestrelServerOptions options) { var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value; var completeQueue = new ConcurrentQueue>(); var writeCalled = false; // Arrange var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); writeCalled = true; return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var mockConnection = new MockConnection(options); var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act (Pre-complete the maximum number of bytes in preparation for the rest of the test) var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. await mockLibuv.OnPostTask; Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); Assert.True(writeCalled); // Arrange writeCalled = false; // Act var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); var writeTask3 = socketOutput.WriteAsync(buffer, default(CancellationToken)); await mockLibuv.OnPostTask; Assert.True(writeCalled); Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Assert // Too many bytes are already pre-completed for the third but not the second write to pre-complete. // https://github.com/aspnet/KestrelHttpServer/issues/356 Assert.Equal(TaskStatus.RanToCompletion, writeTask2.Status); Assert.False(writeTask3.IsCompleted); // Act Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); // Assert // Finishing the first write should allow the third write to pre-complete. Assert.Equal(TaskStatus.RanToCompletion, writeTask3.Status); // Cleanup var cleanupTask = ((SocketOutput)socketOutput).WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Theory] [MemberData(nameof(MaxResponseBufferSizeData))] public async Task ProducingStartAndProducingCompleteCanBeUsedDirectly(KestrelServerOptions options) { int nBuffers = 0; var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { nBuffers = buffers; triggerCompleted(0); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp); // block 1 var start = socketOutput.ProducingStart(); start.Block.End = start.Block.Data.Offset + start.Block.Data.Count; // block 2 var block2 = kestrelThread.Memory.Lease(); block2.End = block2.Data.Offset + block2.Data.Count; start.Block.Next = block2; var end = new MemoryPoolIterator(block2, block2.End); socketOutput.ProducingComplete(end); // A call to Write is required to ensure a write is scheduled var ignore = socketOutput.WriteAsync(default(ArraySegment), default(CancellationToken)); await mockLibuv.OnPostTask; Assert.Equal(2, nBuffers); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); } } [Theory] [MemberData(nameof(MaxResponseBufferSizeData))] public async Task OnlyAllowsUpToThreeConcurrentWrites(KestrelServerOptions options) { var writeCalled = false; var completeQueue = new ConcurrentQueue>(); var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { writeCalled = true; completeQueue.Enqueue(triggerCompleted); return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var mockConnection = new MockConnection(options); var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp); var buffer = new ArraySegment(new byte[1]); // First three writes trigger uv_write var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); await mockLibuv.OnPostTask; Assert.True(writeCalled); writeCalled = false; ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); await mockLibuv.OnPostTask; Assert.True(writeCalled); writeCalled = false; ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); await mockLibuv.OnPostTask; Assert.True(writeCalled); writeCalled = false; // The fourth write won't trigger uv_write since the first three haven't completed ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); await mockLibuv.OnPostTask; Assert.False(writeCalled); // Complete 1st write allowing uv_write to be triggered again Action triggerNextCompleted; Assert.True(completeQueue.TryDequeue(out triggerNextCompleted)); triggerNextCompleted(0); await mockLibuv.OnPostTask; Assert.True(writeCalled); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); // Wait for all writes to complete so the completeQueue isn't modified during enumeration. await mockLibuv.OnPostTask; foreach (var triggerCompleted in completeQueue) { triggerCompleted(0); } } } [Theory] [MemberData(nameof(MaxResponseBufferSizeData))] public async Task WritesAreAggregated(KestrelServerOptions options) { var writeCalled = false; var writeCount = 0; var mockLibuv = new MockLibuv { OnWrite = (socket, buffers, triggerCompleted) => { writeCount++; triggerCompleted(0); writeCalled = true; return 0; } }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(new KestrelServerOptions()), "0", trace, ltp); mockLibuv.KestrelThreadBlocker.Reset(); var buffer = new ArraySegment(new byte[1]); // Two calls to WriteAsync trigger uv_write once if both calls // are made before write is scheduled var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); mockLibuv.KestrelThreadBlocker.Set(); await mockLibuv.OnPostTask; Assert.True(writeCalled); writeCalled = false; // Write isn't called twice after the thread is unblocked await mockLibuv.OnPostTask; Assert.False(writeCalled); // One call to ScheduleWrite Assert.Equal(1, mockLibuv.PostCount); // One call to uv_write Assert.Equal(1, writeCount); // Cleanup var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); } } [Fact] public async Task ProducingStartAndProducingCompleteCanBeCalledAfterConnectionClose() { var mockLibuv = new MockLibuv(); using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) { var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1); kestrelEngine.Threads.Add(kestrelThread); await kestrelThread.StartAsync(); var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new SynchronousThreadPool(); var connection = new MockConnection(new KestrelServerOptions()); var socketOutput = new SocketOutput(kestrelThread, socket, connection, "0", trace, ltp); // Close SocketOutput var cleanupTask = socketOutput.WriteAsync( default(ArraySegment), default(CancellationToken), socketDisconnect: true); await mockLibuv.OnPostTask; Assert.Equal(TaskStatus.RanToCompletion, connection.SocketClosed.Status); var start = socketOutput.ProducingStart(); Assert.True(start.IsDefault); // ProducingComplete should not throw given a default iterator socketOutput.ProducingComplete(start); } } } }