diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs index ccc752ae9b..4b6b335c77 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs @@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter _memory = memory; } - public void Write(ArraySegment buffer, bool immediate, bool chunk) + public void Write(ArraySegment buffer, bool chunk) { lock (_writeLock) { @@ -47,10 +47,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter } } - public Task WriteAsync(ArraySegment buffer, bool immediate, bool chunk, CancellationToken cancellationToken) + public Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { // TODO: Use _outputStream.WriteAsync - Write(buffer, immediate, chunk); + Write(buffer, chunk); return TaskUtilities.CompletedTask; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs index d5225447d1..4c1d096e6f 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs @@ -393,13 +393,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public void Flush() { ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult(); - SocketOutput.Write(_emptyData, immediate: true); + SocketOutput.Write(_emptyData); } public async Task FlushAsync(CancellationToken cancellationToken) { await ProduceStartAndFireOnStarting(immediate: false); - await SocketOutput.WriteAsync(_emptyData, immediate: true, cancellationToken: cancellationToken); + await SocketOutput.WriteAsync(_emptyData, cancellationToken: cancellationToken); } public void Write(ArraySegment data) @@ -416,7 +416,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - SocketOutput.Write(data, immediate: true); + SocketOutput.Write(data); } } @@ -437,7 +437,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken); + return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken); } } @@ -455,23 +455,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - await SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken); + await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken); } } private void WriteChunked(ArraySegment data) { - SocketOutput.Write(data, immediate: false, chunk: true); + SocketOutput.Write(data, chunk: true); } private Task WriteChunkedAsync(ArraySegment data, CancellationToken cancellationToken) { - return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken); + return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken); } private Task WriteChunkedResponseSuffix() { - return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true); + return SocketOutput.WriteAsync(_endChunkedResponseBytes); } private static ArraySegment CreateAsciiByteArraySegment(string text) @@ -493,13 +493,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - public Task ProduceStartAndFireOnStarting(bool immediate = true) + public Task ProduceStartAndFireOnStarting(bool immediate) { if (_responseStarted) return TaskUtilities.CompletedTask; if (_onStarting != null) { - return FireOnStartingProduceStart(immediate: immediate); + return FireOnStartingProduceStart(immediate); } if (_applicationException != null) @@ -604,8 +604,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http bool appCompleted, bool immediate) { - var begin = SocketOutput.ProducingStart(); - var end = begin; + var end = SocketOutput.ProducingStart(); if (_keepAlive) { foreach (var connectionValue in _responseHeaders.HeaderConnection) @@ -662,7 +661,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http if (immediate) { - return SocketOutput.WriteAsync(default(ArraySegment), immediate: true); + // Force a call to uv_write + return SocketOutput.WriteAsync(default(ArraySegment)); } else { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs index 20e20481d3..c526caedf9 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs @@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http /// public interface ISocketOutput { - void Write(ArraySegment buffer, bool immediate = true, bool chunk = false); - Task WriteAsync(ArraySegment buffer, bool immediate = true, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); + void Write(ArraySegment buffer, bool chunk = false); + Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); /// /// Returns an iterator pointing to the tail of the response buffer. Response data can be appended diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs index 15a56f9fc8..7787c93a8f 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs @@ -78,7 +78,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public Task WriteAsync( ArraySegment buffer, - bool immediate = true, bool chunk = false, bool socketShutdownSend = false, bool socketDisconnect = false, @@ -130,13 +129,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http _nextWriteContext.SocketDisconnect = true; } - if (!immediate) - { - // immediate==false calls always return complete tasks, because there is guaranteed - // to be a subsequent immediate==true call which will go down one of the previous code-paths - _numBytesPreCompleted += buffer.Count; - } - else if (_lastWriteError == null && + if (_lastWriteError == null && _tasksPending.Count == 0 && _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted) { @@ -155,7 +148,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http }); } - if (!_writePending && immediate) + if (!_writePending) { _writePending = true; scheduleWrite = true; @@ -177,13 +170,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { case ProduceEndType.SocketShutdownSend: WriteAsync(default(ArraySegment), - immediate: true, socketShutdownSend: true, socketDisconnect: false); break; case ProduceEndType.SocketDisconnect: WriteAsync(default(ArraySegment), - immediate: true, socketShutdownSend: false, socketDisconnect: true); break; @@ -391,14 +382,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - void ISocketOutput.Write(ArraySegment buffer, bool immediate, bool chunk) + void ISocketOutput.Write(ArraySegment buffer, bool chunk) { - WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult(); + WriteAsync(buffer, chunk, isSync: true).GetAwaiter().GetResult(); } - Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, bool chunk, CancellationToken cancellationToken) + Task ISocketOutput.WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { - return WriteAsync(buffer, immediate, chunk); + return WriteAsync(buffer, chunk); } private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 end, out int bytes, out int buffers) diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs index 42a08006f2..0b325f8710 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs @@ -3,6 +3,7 @@ using System; using System.Text; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Testing.xunit; using Xunit; @@ -166,6 +167,50 @@ namespace Microsoft.AspNetCore.Server.KestrelTests } } } + + [ConditionalFact] + [FrameworkSkipCondition(RuntimeFrameworks.Mono, SkipReason = "Test hangs after execution on Mono.")] + public async Task WritesAreFlushedPriorToResponseCompletion() + { + var flushWh = new ManualResetEventSlim(); + + using (var server = new TestServer(async httpContext => + { + var response = httpContext.Response; + response.Headers.Clear(); + await response.Body.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6); + + // Don't complete response until client has received the first chunk. + flushWh.Wait(); + + await response.Body.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6); + })) + { + using (var connection = new TestConnection()) + { + await connection.SendEnd( + "GET / HTTP/1.1", + "", + ""); + await connection.Receive( + "HTTP/1.1 200 OK", + "Transfer-Encoding: chunked", + "", + "6", + "Hello ", + ""); + + flushWh.Set(); + + await connection.ReceiveEnd( + "6", + "World!", + "0", + "", + ""); + } + } + } } } diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/EngineTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/EngineTests.cs index a44f096129..6f36a7a2cf 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/EngineTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/EngineTests.cs @@ -1112,7 +1112,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests connectionCloseWh.Wait(); response.Headers.Clear(); - response.Headers["Content-Length"] = new[] { "5" }; try { diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs index 01c42c5f18..9fedc0dad4 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs @@ -121,11 +121,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests } [Fact] - public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted() + public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted() { // This should match _maxBytesPreCompleted in SocketOutput var maxBytesPreCompleted = 65536; var completeQueue = new Queue>(); + var writeRequestedWh = new ManualResetEventSlim(); // Arrange var mockLibuv = new MockLibuv @@ -133,6 +134,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); + writeRequestedWh.Set(); return 0; } }; @@ -148,53 +150,40 @@ namespace Microsoft.AspNetCore.Server.KestrelTests var ltp = new LoggingThreadPool(trace); var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); - var bufferSize = maxBytesPreCompleted; - + var bufferSize = maxBytesPreCompleted / 2; var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - var halfBuffer = new ArraySegment(data, 0, bufferSize / 2); - - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (Task t) => - { - Assert.Null(t.Exception); - completedWh.Set(); - }; + var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); // Act - socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted); + var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer); // Assert - // The first write should pre-complete since it is not immediate. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted); - // Assert - // The second write should pre-complete since it is <= _maxBytesPreCompleted. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted); - // Assert - // The third write should pre-complete since it is not immediate, even though too many. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); + Assert.True(writeRequestedWh.Wait(1000)); + writeRequestedWh.Reset(); + + // Add more bytes to the write-behind buffer to prevent the next write from + var iter = socketOutput.ProducingStart(); + iter.CopyFrom(halfWriteBehindBuffer); + socketOutput.ProducingComplete(iter); + // Act - socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted); + var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer); + // Assert // Too many bytes are already pre-completed for the fourth write to pre-complete. - Assert.False(completedWh.Wait(1000)); + Assert.True(writeRequestedWh.Wait(1000)); + Assert.False(writeTask2.IsCompleted); + + // 2 calls have been made to uv_write + Assert.Equal(2, completeQueue.Count); + // Act - while (completeQueue.Count > 0) - { - completeQueue.Dequeue()(0); - } + completeQueue.Dequeue()(0); + // Assert // Finishing the first write should allow the second write to pre-complete. - Assert.True(completedWh.Wait(1000)); + Assert.True(writeTask2.Wait(1000)); } }