Flush chunked writes before the entire response completes

Ensure chunked writes force an "immediate" write by removing the now
unused immediate parameter from ISocketOutput.Write
This commit is contained in:
Stephen Halter 2016-01-23 20:29:30 -08:00
parent 63e0ed8773
commit 7570da9daa
7 changed files with 96 additions and 72 deletions

View File

@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
_memory = memory;
}
public void Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
public void Write(ArraySegment<byte> buffer, bool chunk)
{
lock (_writeLock)
{
@ -47,10 +47,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
}
}
public Task WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
// TODO: Use _outputStream.WriteAsync
Write(buffer, immediate, chunk);
Write(buffer, chunk);
return TaskUtilities.CompletedTask;
}

View File

@ -393,13 +393,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public void Flush()
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
SocketOutput.Write(_emptyData, immediate: true);
SocketOutput.Write(_emptyData);
}
public async Task FlushAsync(CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await SocketOutput.WriteAsync(_emptyData, immediate: true, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(_emptyData, cancellationToken: cancellationToken);
}
public void Write(ArraySegment<byte> data)
@ -416,7 +416,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
SocketOutput.Write(data, immediate: true);
SocketOutput.Write(data);
}
}
@ -437,7 +437,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}
@ -455,23 +455,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
await SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}
private void WriteChunked(ArraySegment<byte> data)
{
SocketOutput.Write(data, immediate: false, chunk: true);
SocketOutput.Write(data, chunk: true);
}
private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken);
}
private Task WriteChunkedResponseSuffix()
{
return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true);
return SocketOutput.WriteAsync(_endChunkedResponseBytes);
}
private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
@ -493,13 +493,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
public Task ProduceStartAndFireOnStarting(bool immediate = true)
public Task ProduceStartAndFireOnStarting(bool immediate)
{
if (_responseStarted) return TaskUtilities.CompletedTask;
if (_onStarting != null)
{
return FireOnStartingProduceStart(immediate: immediate);
return FireOnStartingProduceStart(immediate);
}
if (_applicationException != null)
@ -604,8 +604,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
bool appCompleted,
bool immediate)
{
var begin = SocketOutput.ProducingStart();
var end = begin;
var end = SocketOutput.ProducingStart();
if (_keepAlive)
{
foreach (var connectionValue in _responseHeaders.HeaderConnection)
@ -662,7 +661,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
if (immediate)
{
return SocketOutput.WriteAsync(default(ArraySegment<byte>), immediate: true);
// Force a call to uv_write
return SocketOutput.WriteAsync(default(ArraySegment<byte>));
}
else
{

View File

@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// </summary>
public interface ISocketOutput
{
void Write(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));
void Write(ArraySegment<byte> buffer, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Returns an iterator pointing to the tail of the response buffer. Response data can be appended

View File

@ -78,7 +78,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public Task WriteAsync(
ArraySegment<byte> buffer,
bool immediate = true,
bool chunk = false,
bool socketShutdownSend = false,
bool socketDisconnect = false,
@ -130,13 +129,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
_nextWriteContext.SocketDisconnect = true;
}
if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down one of the previous code-paths
_numBytesPreCompleted += buffer.Count;
}
else if (_lastWriteError == null &&
if (_lastWriteError == null &&
_tasksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted)
{
@ -155,7 +148,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
});
}
if (!_writePending && immediate)
if (!_writePending)
{
_writePending = true;
scheduleWrite = true;
@ -177,13 +170,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
case ProduceEndType.SocketShutdownSend:
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: true,
socketDisconnect: false);
break;
case ProduceEndType.SocketDisconnect:
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: false,
socketDisconnect: true);
break;
@ -391,14 +382,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
void ISocketOutput.Write(ArraySegment<byte> buffer, bool chunk)
{
WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult();
WriteAsync(buffer, chunk, isSync: true).GetAwaiter().GetResult();
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
return WriteAsync(buffer, immediate, chunk);
return WriteAsync(buffer, chunk);
}
private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 end, out int bytes, out int buffers)

View File

@ -3,6 +3,7 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Testing.xunit;
using Xunit;
@ -166,6 +167,50 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
}
[ConditionalFact]
[FrameworkSkipCondition(RuntimeFrameworks.Mono, SkipReason = "Test hangs after execution on Mono.")]
public async Task WritesAreFlushedPriorToResponseCompletion()
{
var flushWh = new ManualResetEventSlim();
using (var server = new TestServer(async httpContext =>
{
var response = httpContext.Response;
response.Headers.Clear();
await response.Body.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6);
// Don't complete response until client has received the first chunk.
flushWh.Wait();
await response.Body.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6);
}))
{
using (var connection = new TestConnection())
{
await connection.SendEnd(
"GET / HTTP/1.1",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
"Transfer-Encoding: chunked",
"",
"6",
"Hello ",
"");
flushWh.Set();
await connection.ReceiveEnd(
"6",
"World!",
"0",
"",
"");
}
}
}
}
}

View File

@ -1112,7 +1112,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
connectionCloseWh.Wait();
response.Headers.Clear();
response.Headers["Content-Length"] = new[] { "5" };
try
{

View File

@ -121,11 +121,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
[Fact]
public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var writeRequestedWh = new ManualResetEventSlim();
// Arrange
var mockLibuv = new MockLibuv
@ -133,6 +134,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
writeRequestedWh.Set();
return 0;
}
};
@ -148,53 +150,40 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var bufferSize = maxBytesPreCompleted / 2;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
var halfBuffer = new ArraySegment<byte>(data, 0, bufferSize / 2);
var completedWh = new ManualResetEventSlim();
Action<Task> onCompleted = (Task t) =>
{
Assert.Null(t.Exception);
completedWh.Set();
};
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted);
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer);
// Assert
// The first write should pre-complete since it is not immediate.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted);
// Assert
// The second write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer, false).ContinueWith(onCompleted);
// Assert
// The third write should pre-complete since it is not immediate, even though too many.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
Assert.True(writeRequestedWh.Wait(1000));
writeRequestedWh.Reset();
// Add more bytes to the write-behind buffer to prevent the next write from
var iter = socketOutput.ProducingStart();
iter.CopyFrom(halfWriteBehindBuffer);
socketOutput.ProducingComplete(iter);
// Act
socketOutput.WriteAsync(halfBuffer).ContinueWith(onCompleted);
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer);
// Assert
// Too many bytes are already pre-completed for the fourth write to pre-complete.
Assert.False(completedWh.Wait(1000));
Assert.True(writeRequestedWh.Wait(1000));
Assert.False(writeTask2.IsCompleted);
// 2 calls have been made to uv_write
Assert.Equal(2, completeQueue.Count);
// Act
while (completeQueue.Count > 0)
{
completeQueue.Dequeue()(0);
}
completeQueue.Dequeue()(0);
// Assert
// Finishing the first write should allow the second write to pre-complete.
Assert.True(completedWh.Wait(1000));
Assert.True(writeTask2.Wait(1000));
}
}