diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs index ccc752ae9b..4b6b335c77 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/StreamSocketOutput.cs @@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter _memory = memory; } - public void Write(ArraySegment buffer, bool immediate, bool chunk) + public void Write(ArraySegment buffer, bool chunk) { lock (_writeLock) { @@ -47,10 +47,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter } } - public Task WriteAsync(ArraySegment buffer, bool immediate, bool chunk, CancellationToken cancellationToken) + public Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { // TODO: Use _outputStream.WriteAsync - Write(buffer, immediate, chunk); + Write(buffer, chunk); return TaskUtilities.CompletedTask; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.FeatureCollection.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.FeatureCollection.cs index 8a10a7c618..f93f88915d 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.FeatureCollection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.FeatureCollection.cs @@ -312,7 +312,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - await ProduceStartAndFireOnStarting(immediate: true); + await ProduceStartAndFireOnStarting(); + + // Force flush + await SocketOutput.WriteAsync(_emptyData); return DuplexStream; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs index 61f09dbbb1..5f753a9558 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs @@ -399,19 +399,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public void Flush() { - ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult(); - SocketOutput.Write(_emptyData, immediate: true); + ProduceStartAndFireOnStarting().GetAwaiter().GetResult(); + SocketOutput.Write(_emptyData); } public async Task FlushAsync(CancellationToken cancellationToken) { - await ProduceStartAndFireOnStarting(immediate: false); - await SocketOutput.WriteAsync(_emptyData, immediate: true, cancellationToken: cancellationToken); + await ProduceStartAndFireOnStarting(); + await SocketOutput.WriteAsync(_emptyData, cancellationToken: cancellationToken); } public void Write(ArraySegment data) { - ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult(); + ProduceStartAndFireOnStarting().GetAwaiter().GetResult(); if (_autoChunk) { @@ -423,7 +423,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - SocketOutput.Write(data, immediate: true); + SocketOutput.Write(data); } } @@ -444,13 +444,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken); + return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken); } } public async Task WriteAsyncAwaited(ArraySegment data, CancellationToken cancellationToken) { - await ProduceStartAndFireOnStarting(immediate: false); + await ProduceStartAndFireOnStarting(); if (_autoChunk) { @@ -462,23 +462,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } else { - await SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken); + await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken); } } private void WriteChunked(ArraySegment data) { - SocketOutput.Write(data, immediate: true, chunk: true); + SocketOutput.Write(data, chunk: true); } private Task WriteChunkedAsync(ArraySegment data, CancellationToken cancellationToken) { - return SocketOutput.WriteAsync(data, immediate: true, chunk: true, cancellationToken: cancellationToken); + return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken); } private Task WriteChunkedResponseSuffix() { - return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true); + return SocketOutput.WriteAsync(_endChunkedResponseBytes); } private static ArraySegment CreateAsciiByteArraySegment(string text) @@ -500,13 +500,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - public Task ProduceStartAndFireOnStarting(bool immediate = true) + public Task ProduceStartAndFireOnStarting() { if (_responseStarted) return TaskUtilities.CompletedTask; if (_onStarting != null) { - return FireOnStartingProduceStart(immediate: immediate); + return ProduceStartAndFireOnStartingAwaited(); } if (_applicationException != null) @@ -516,10 +516,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http _applicationException); } - return ProduceStart(immediate, appCompleted: false); + ProduceStart(appCompleted: false); + + return TaskUtilities.CompletedTask; } - private async Task FireOnStartingProduceStart(bool immediate) + private async Task ProduceStartAndFireOnStartingAwaited() { await FireOnStarting(); @@ -530,17 +532,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http _applicationException); } - await ProduceStart(immediate, appCompleted: false); + ProduceStart(appCompleted: false); } - private Task ProduceStart(bool immediate, bool appCompleted) + private void ProduceStart(bool appCompleted) { - if (_responseStarted) return TaskUtilities.CompletedTask; + if (_responseStarted) return; _responseStarted = true; var statusBytes = ReasonPhrases.ToStatusBytes(StatusCode, ReasonPhrase); - return CreateResponseHeader(statusBytes, appCompleted, immediate); + CreateResponseHeader(statusBytes, appCompleted); } protected Task ProduceEnd() @@ -563,7 +565,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - if (!_responseStarted) { return ProduceEndAwaited(); @@ -574,7 +575,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http private async Task ProduceEndAwaited() { - await ProduceStart(immediate: true, appCompleted: true); + ProduceStart(appCompleted: true); + + // Force flush + await SocketOutput.WriteAsync(_emptyData); await WriteSuffix(); } @@ -606,13 +610,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - private Task CreateResponseHeader( + private void CreateResponseHeader( byte[] statusBytes, - bool appCompleted, - bool immediate) + bool appCompleted) { - var begin = SocketOutput.ProducingStart(); - var end = begin; + var end = SocketOutput.ProducingStart(); if (_keepAlive) { foreach (var connectionValue in _responseHeaders.HeaderConnection) @@ -666,15 +668,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http end.CopyFrom(_bytesEndHeaders, 0, _bytesEndHeaders.Length); SocketOutput.ProducingComplete(end); - - if (immediate) - { - return SocketOutput.WriteAsync(default(ArraySegment), immediate: true); - } - else - { - return TaskUtilities.CompletedTask; - } } protected bool TakeStartLine(SocketInput input) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs index 20e20481d3..c526caedf9 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs @@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http /// public interface ISocketOutput { - void Write(ArraySegment buffer, bool immediate = true, bool chunk = false); - Task WriteAsync(ArraySegment buffer, bool immediate = true, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); + void Write(ArraySegment buffer, bool chunk = false); + Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); /// /// Returns an iterator pointing to the tail of the response buffer. Response data can be appended diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs index eed0230e3e..7033d4a1fa 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs @@ -80,7 +80,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http public Task WriteAsync( ArraySegment buffer, CancellationToken cancellationToken, - bool immediate = true, bool chunk = false, bool socketShutdownSend = false, bool socketDisconnect = false, @@ -144,13 +143,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http _nextWriteContext.SocketDisconnect = true; } - if (!immediate) - { - // immediate==false calls always return complete tasks, because there is guaranteed - // to be a subsequent immediate==true call which will go down one of the previous code-paths - _numBytesPreCompleted += buffer.Count; - } - else if (_lastWriteError == null && + if (_lastWriteError == null && _tasksPending.Count == 0 && _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted) { @@ -192,7 +185,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - if (!_writePending && immediate) + if (!_writePending) { _writePending = true; scheduleWrite = true; @@ -215,14 +208,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http case ProduceEndType.SocketShutdownSend: WriteAsync(default(ArraySegment), default(CancellationToken), - immediate: true, socketShutdownSend: true, socketDisconnect: false); break; case ProduceEndType.SocketDisconnect: WriteAsync(default(ArraySegment), default(CancellationToken), - immediate: true, socketShutdownSend: false, socketDisconnect: true); break; @@ -481,12 +472,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - void ISocketOutput.Write(ArraySegment buffer, bool immediate, bool chunk) + void ISocketOutput.Write(ArraySegment buffer, bool chunk) { - WriteAsync(buffer, CancellationToken.None, immediate, chunk, isSync: true).GetAwaiter().GetResult(); + WriteAsync(buffer, CancellationToken.None, chunk, isSync: true).GetAwaiter().GetResult(); } - Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, bool chunk, CancellationToken cancellationToken) + Task ISocketOutput.WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { @@ -499,7 +490,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http return TaskUtilities.CompletedTask; } - return WriteAsync(buffer, cancellationToken, immediate, chunk); + return WriteAsync(buffer, cancellationToken, chunk); } private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 end, out int bytes, out int buffers) diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs index 42a08006f2..0b325f8710 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/ChunkedResponseTests.cs @@ -3,6 +3,7 @@ using System; using System.Text; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Testing.xunit; using Xunit; @@ -166,6 +167,50 @@ namespace Microsoft.AspNetCore.Server.KestrelTests } } } + + [ConditionalFact] + [FrameworkSkipCondition(RuntimeFrameworks.Mono, SkipReason = "Test hangs after execution on Mono.")] + public async Task WritesAreFlushedPriorToResponseCompletion() + { + var flushWh = new ManualResetEventSlim(); + + using (var server = new TestServer(async httpContext => + { + var response = httpContext.Response; + response.Headers.Clear(); + await response.Body.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6); + + // Don't complete response until client has received the first chunk. + flushWh.Wait(); + + await response.Body.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6); + })) + { + using (var connection = new TestConnection()) + { + await connection.SendEnd( + "GET / HTTP/1.1", + "", + ""); + await connection.Receive( + "HTTP/1.1 200 OK", + "Transfer-Encoding: chunked", + "", + "6", + "Hello ", + ""); + + flushWh.Set(); + + await connection.ReceiveEnd( + "6", + "World!", + "0", + "", + ""); + } + } + } } } diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs index 974829e93b..127a79477d 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs @@ -121,11 +121,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests } [Fact] - public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted() + public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted() { // This should match _maxBytesPreCompleted in SocketOutput var maxBytesPreCompleted = 65536; var completeQueue = new Queue>(); + var writeRequestedWh = new ManualResetEventSlim(); // Arrange var mockLibuv = new MockLibuv @@ -133,6 +134,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests OnWrite = (socket, buffers, triggerCompleted) => { completeQueue.Enqueue(triggerCompleted); + writeRequestedWh.Set(); return 0; } }; @@ -148,53 +150,39 @@ namespace Microsoft.AspNetCore.Server.KestrelTests var ltp = new LoggingThreadPool(trace); var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); - var bufferSize = maxBytesPreCompleted; - + var bufferSize = maxBytesPreCompleted / 2; var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - var halfBuffer = new ArraySegment(data, 0, bufferSize / 2); - - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (Task t) => - { - Assert.Null(t.Exception); - completedWh.Set(); - }; + var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); // Act - socketOutput.WriteAsync(halfBuffer, default(CancellationToken), false).ContinueWith(onCompleted); + var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer); // Assert - // The first write should pre-complete since it is not immediate. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.WriteAsync(halfBuffer, default(CancellationToken)).ContinueWith(onCompleted); - // Assert - // The second write should pre-complete since it is <= _maxBytesPreCompleted. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.WriteAsync(halfBuffer, default(CancellationToken), false).ContinueWith(onCompleted); - // Assert - // The third write should pre-complete since it is not immediate, even though too many. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); + Assert.True(writeRequestedWh.Wait(1000)); + writeRequestedWh.Reset(); + + // Add more bytes to the write-behind buffer to prevent the next write from + var iter = socketOutput.ProducingStart(); + iter.CopyFrom(halfWriteBehindBuffer); + socketOutput.ProducingComplete(iter); + // Act - socketOutput.WriteAsync(halfBuffer, default(CancellationToken)).ContinueWith(onCompleted); + var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer); // Assert // Too many bytes are already pre-completed for the fourth write to pre-complete. - Assert.False(completedWh.Wait(1000)); + Assert.True(writeRequestedWh.Wait(1000)); + Assert.False(writeTask2.IsCompleted); + + // 2 calls have been made to uv_write + Assert.Equal(2, completeQueue.Count); + // Act - while (completeQueue.Count > 0) - { - completeQueue.Dequeue()(0); - } + completeQueue.Dequeue()(0); + // Assert // Finishing the first write should allow the second write to pre-complete. - Assert.True(completedWh.Wait(1000)); + Assert.True(writeTask2.Wait(1000)); } } diff --git a/tools/Microsoft.AspNetCore.Server.Kestrel.LibuvCopier/Program.cs b/tools/Microsoft.AspNetCore.Server.Kestrel.LibuvCopier/Program.cs index 6fb635145a..8ed1e4df1a 100644 --- a/tools/Microsoft.AspNetCore.Server.Kestrel.LibuvCopier/Program.cs +++ b/tools/Microsoft.AspNetCore.Server.Kestrel.LibuvCopier/Program.cs @@ -11,30 +11,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.LibuvCopier { try { - var packagesFolder = Environment.GetEnvironmentVariable("DNX_PACKAGES"); - - if (string.IsNullOrEmpty(packagesFolder)) - { - var dnxFolder = Environment.GetEnvironmentVariable("DNX_HOME") ?? - Environment.GetEnvironmentVariable("DNX_USER_HOME") ?? - Environment.GetEnvironmentVariable("DNX_GLOBAL_HOME"); - - var firstCandidate = dnxFolder?.Split(';') - ?.Select(path => Environment.ExpandEnvironmentVariables(path)) - ?.Where(path => Directory.Exists(path)) - ?.FirstOrDefault(); - - if (string.IsNullOrEmpty(firstCandidate)) - { - dnxFolder = Path.Combine(GetHome(), ".dnx"); - } - else - { - dnxFolder = firstCandidate; - } - - packagesFolder = Path.Combine(dnxFolder, "packages"); - } + var packagesFolder = Environment.GetEnvironmentVariable("DNX_PACKAGES") ?? + Path.Combine(GetHome(), ".nuget", "packages"); packagesFolder = Environment.ExpandEnvironmentVariables(packagesFolder);