Merge remote-tracking branch 'refs/remotes/aspnet/shalter/always-immediate' into combined-fixes

This commit is contained in:
Ben Adams 2016-01-26 02:15:56 +00:00
commit 9f2d685cbf
8 changed files with 117 additions and 119 deletions

View File

@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
_memory = memory;
}
public void Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
public void Write(ArraySegment<byte> buffer, bool chunk)
{
lock (_writeLock)
{
@ -47,10 +47,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
}
}
public Task WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
// TODO: Use _outputStream.WriteAsync
Write(buffer, immediate, chunk);
Write(buffer, chunk);
return TaskUtilities.CompletedTask;
}

View File

@ -312,7 +312,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
await ProduceStartAndFireOnStarting(immediate: true);
await ProduceStartAndFireOnStarting();
// Force flush
await SocketOutput.WriteAsync(_emptyData);
return DuplexStream;
}

View File

@ -399,19 +399,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public void Flush()
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
SocketOutput.Write(_emptyData, immediate: true);
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
SocketOutput.Write(_emptyData);
}
public async Task FlushAsync(CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await SocketOutput.WriteAsync(_emptyData, immediate: true, cancellationToken: cancellationToken);
await ProduceStartAndFireOnStarting();
await SocketOutput.WriteAsync(_emptyData, cancellationToken: cancellationToken);
}
public void Write(ArraySegment<byte> data)
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
if (_autoChunk)
{
@ -423,7 +423,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
SocketOutput.Write(data, immediate: true);
SocketOutput.Write(data);
}
}
@ -444,13 +444,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}
public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await ProduceStartAndFireOnStarting();
if (_autoChunk)
{
@ -462,23 +462,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
else
{
await SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}
private void WriteChunked(ArraySegment<byte> data)
{
SocketOutput.Write(data, immediate: true, chunk: true);
SocketOutput.Write(data, chunk: true);
}
private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
return SocketOutput.WriteAsync(data, immediate: true, chunk: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken);
}
private Task WriteChunkedResponseSuffix()
{
return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true);
return SocketOutput.WriteAsync(_endChunkedResponseBytes);
}
private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
@ -500,13 +500,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
public Task ProduceStartAndFireOnStarting(bool immediate = true)
public Task ProduceStartAndFireOnStarting()
{
if (_responseStarted) return TaskUtilities.CompletedTask;
if (_onStarting != null)
{
return FireOnStartingProduceStart(immediate: immediate);
return ProduceStartAndFireOnStartingAwaited();
}
if (_applicationException != null)
@ -516,10 +516,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
_applicationException);
}
return ProduceStart(immediate, appCompleted: false);
ProduceStart(appCompleted: false);
return TaskUtilities.CompletedTask;
}
private async Task FireOnStartingProduceStart(bool immediate)
private async Task ProduceStartAndFireOnStartingAwaited()
{
await FireOnStarting();
@ -530,17 +532,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
_applicationException);
}
await ProduceStart(immediate, appCompleted: false);
ProduceStart(appCompleted: false);
}
private Task ProduceStart(bool immediate, bool appCompleted)
private void ProduceStart(bool appCompleted)
{
if (_responseStarted) return TaskUtilities.CompletedTask;
if (_responseStarted) return;
_responseStarted = true;
var statusBytes = ReasonPhrases.ToStatusBytes(StatusCode, ReasonPhrase);
return CreateResponseHeader(statusBytes, appCompleted, immediate);
CreateResponseHeader(statusBytes, appCompleted);
}
protected Task ProduceEnd()
@ -563,7 +565,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
if (!_responseStarted)
{
return ProduceEndAwaited();
@ -574,7 +575,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private async Task ProduceEndAwaited()
{
await ProduceStart(immediate: true, appCompleted: true);
ProduceStart(appCompleted: true);
// Force flush
await SocketOutput.WriteAsync(_emptyData);
await WriteSuffix();
}
@ -606,13 +610,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
private Task CreateResponseHeader(
private void CreateResponseHeader(
byte[] statusBytes,
bool appCompleted,
bool immediate)
bool appCompleted)
{
var begin = SocketOutput.ProducingStart();
var end = begin;
var end = SocketOutput.ProducingStart();
if (_keepAlive)
{
foreach (var connectionValue in _responseHeaders.HeaderConnection)
@ -666,15 +668,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
end.CopyFrom(_bytesEndHeaders, 0, _bytesEndHeaders.Length);
SocketOutput.ProducingComplete(end);
if (immediate)
{
return SocketOutput.WriteAsync(default(ArraySegment<byte>), immediate: true);
}
else
{
return TaskUtilities.CompletedTask;
}
}
protected bool TakeStartLine(SocketInput input)

View File

@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// </summary>
public interface ISocketOutput
{
void Write(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));
void Write(ArraySegment<byte> buffer, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Returns an iterator pointing to the tail of the response buffer. Response data can be appended

View File

@ -80,7 +80,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public Task WriteAsync(
ArraySegment<byte> buffer,
CancellationToken cancellationToken,
bool immediate = true,
bool chunk = false,
bool socketShutdownSend = false,
bool socketDisconnect = false,
@ -144,13 +143,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
_nextWriteContext.SocketDisconnect = true;
}
if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down one of the previous code-paths
_numBytesPreCompleted += buffer.Count;
}
else if (_lastWriteError == null &&
if (_lastWriteError == null &&
_tasksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted)
{
@ -192,7 +185,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
if (!_writePending && immediate)
if (!_writePending)
{
_writePending = true;
scheduleWrite = true;
@ -215,14 +208,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
case ProduceEndType.SocketShutdownSend:
WriteAsync(default(ArraySegment<byte>),
default(CancellationToken),
immediate: true,
socketShutdownSend: true,
socketDisconnect: false);
break;
case ProduceEndType.SocketDisconnect:
WriteAsync(default(ArraySegment<byte>),
default(CancellationToken),
immediate: true,
socketShutdownSend: false,
socketDisconnect: true);
break;
@ -481,12 +472,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
void ISocketOutput.Write(ArraySegment<byte> buffer, bool chunk)
{
WriteAsync(buffer, CancellationToken.None, immediate, chunk, isSync: true).GetAwaiter().GetResult();
WriteAsync(buffer, CancellationToken.None, chunk, isSync: true).GetAwaiter().GetResult();
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@ -499,7 +490,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
return TaskUtilities.CompletedTask;
}
return WriteAsync(buffer, cancellationToken, immediate, chunk);
return WriteAsync(buffer, cancellationToken, chunk);
}
private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 end, out int bytes, out int buffers)

View File

@ -3,6 +3,7 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Testing.xunit;
using Xunit;
@ -166,6 +167,50 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
}
[ConditionalFact]
[FrameworkSkipCondition(RuntimeFrameworks.Mono, SkipReason = "Test hangs after execution on Mono.")]
public async Task WritesAreFlushedPriorToResponseCompletion()
{
var flushWh = new ManualResetEventSlim();
using (var server = new TestServer(async httpContext =>
{
var response = httpContext.Response;
response.Headers.Clear();
await response.Body.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6);
// Don't complete response until client has received the first chunk.
flushWh.Wait();
await response.Body.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6);
}))
{
using (var connection = new TestConnection())
{
await connection.SendEnd(
"GET / HTTP/1.1",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
"Transfer-Encoding: chunked",
"",
"6",
"Hello ",
"");
flushWh.Set();
await connection.ReceiveEnd(
"6",
"World!",
"0",
"",
"");
}
}
}
}
}

View File

@ -121,11 +121,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
[Fact]
public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyPreCompleted()
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new Queue<Action<int>>();
var writeRequestedWh = new ManualResetEventSlim();
// Arrange
var mockLibuv = new MockLibuv
@ -133,6 +134,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
writeRequestedWh.Set();
return 0;
}
};
@ -148,53 +150,39 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var bufferSize = maxBytesPreCompleted / 2;
var data = new byte[bufferSize];
var fullBuffer = new ArraySegment<byte>(data, 0, bufferSize);
var halfBuffer = new ArraySegment<byte>(data, 0, bufferSize / 2);
var completedWh = new ManualResetEventSlim();
Action<Task> onCompleted = (Task t) =>
{
Assert.Null(t.Exception);
completedWh.Set();
};
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
// Act
socketOutput.WriteAsync(halfBuffer, default(CancellationToken), false).ContinueWith(onCompleted);
var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer);
// Assert
// The first write should pre-complete since it is not immediate.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer, default(CancellationToken)).ContinueWith(onCompleted);
// Assert
// The second write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// Act
socketOutput.WriteAsync(halfBuffer, default(CancellationToken), false).ContinueWith(onCompleted);
// Assert
// The third write should pre-complete since it is not immediate, even though too many.
Assert.True(completedWh.Wait(1000));
// Arrange
completedWh.Reset();
// The first write should pre-complete since it is <= _maxBytesPreCompleted.
Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status);
Assert.True(writeRequestedWh.Wait(1000));
writeRequestedWh.Reset();
// Add more bytes to the write-behind buffer to prevent the next write from
var iter = socketOutput.ProducingStart();
iter.CopyFrom(halfWriteBehindBuffer);
socketOutput.ProducingComplete(iter);
// Act
socketOutput.WriteAsync(halfBuffer, default(CancellationToken)).ContinueWith(onCompleted);
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer);
// Assert
// Too many bytes are already pre-completed for the fourth write to pre-complete.
Assert.False(completedWh.Wait(1000));
Assert.True(writeRequestedWh.Wait(1000));
Assert.False(writeTask2.IsCompleted);
// 2 calls have been made to uv_write
Assert.Equal(2, completeQueue.Count);
// Act
while (completeQueue.Count > 0)
{
completeQueue.Dequeue()(0);
}
completeQueue.Dequeue()(0);
// Assert
// Finishing the first write should allow the second write to pre-complete.
Assert.True(completedWh.Wait(1000));
Assert.True(writeTask2.Wait(1000));
}
}

View File

@ -11,30 +11,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.LibuvCopier
{
try
{
var packagesFolder = Environment.GetEnvironmentVariable("DNX_PACKAGES");
if (string.IsNullOrEmpty(packagesFolder))
{
var dnxFolder = Environment.GetEnvironmentVariable("DNX_HOME") ??
Environment.GetEnvironmentVariable("DNX_USER_HOME") ??
Environment.GetEnvironmentVariable("DNX_GLOBAL_HOME");
var firstCandidate = dnxFolder?.Split(';')
?.Select(path => Environment.ExpandEnvironmentVariables(path))
?.Where(path => Directory.Exists(path))
?.FirstOrDefault();
if (string.IsNullOrEmpty(firstCandidate))
{
dnxFolder = Path.Combine(GetHome(), ".dnx");
}
else
{
dnxFolder = firstCandidate;
}
packagesFolder = Path.Combine(dnxFolder, "packages");
}
var packagesFolder = Environment.GetEnvironmentVariable("DNX_PACKAGES") ??
Path.Combine(GetHome(), ".nuget", "packages");
packagesFolder = Environment.ExpandEnvironmentVariables(packagesFolder);