Reduce Sockets mainloop Send/Receive statemachine size (#2376)

* Use Completion to Advance in Error

* Drop ReadResult from statemachine
This commit is contained in:
Ben Adams 2018-03-14 02:04:12 -04:00 committed by David Fowler
parent 572627e88c
commit e65e58daf3
1 changed files with 70 additions and 63 deletions

View File

@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
internal sealed class SocketConnection : TransportConnection
{
private const int MinAllocBufferSize = 2048;
public static bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
public readonly static bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
private readonly Socket _socket;
private readonly PipeScheduler _scheduler;
@ -104,41 +104,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
try
{
while (true)
{
// Ensure we have some reasonable amount of buffer space
var buffer = Input.GetMemory(MinAllocBufferSize);
var bytesReceived = await _receiver.ReceiveAsync(buffer);
if (bytesReceived == 0)
{
// FIN
_trace.ConnectionReadFin(ConnectionId);
break;
}
Input.Advance(bytesReceived);
var flushTask = Input.FlushAsync();
if (!flushTask.IsCompleted)
{
_trace.ConnectionPause(ConnectionId);
await flushTask;
_trace.ConnectionResume(ConnectionId);
}
var result = flushTask.GetAwaiter().GetResult();
if (result.IsCompleted)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
await ProcessReceives();
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
@ -186,39 +152,51 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
}
}
private async Task ProcessReceives()
{
while (true)
{
// Ensure we have some reasonable amount of buffer space
var buffer = Input.GetMemory(MinAllocBufferSize);
var bytesReceived = await _receiver.ReceiveAsync(buffer);
if (bytesReceived == 0)
{
// FIN
_trace.ConnectionReadFin(ConnectionId);
break;
}
Input.Advance(bytesReceived);
var flushTask = Input.FlushAsync();
if (!flushTask.IsCompleted)
{
_trace.ConnectionPause(ConnectionId);
await flushTask;
_trace.ConnectionResume(ConnectionId);
}
var result = flushTask.GetAwaiter().GetResult();
if (result.IsCompleted)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
}
private async Task<Exception> DoSend()
{
Exception error = null;
try
{
while (true)
{
// Wait for data to write from the pipe producer
var result = await Output.ReadAsync();
var buffer = result.Buffer;
if (result.IsCanceled)
{
break;
}
try
{
if (!buffer.IsEmpty)
{
await _sender.SendAsync(buffer);
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
Output.AdvanceTo(buffer.End);
}
}
await ProcessSends();
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
@ -248,5 +226,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
return error;
}
private async Task ProcessSends()
{
while (true)
{
// Wait for data to write from the pipe producer
var result = await Output.ReadAsync();
var buffer = result.Buffer;
if (result.IsCanceled)
{
break;
}
var end = buffer.End;
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await _sender.SendAsync(buffer);
}
Output.AdvanceTo(end);
if (isCompleted)
{
break;
}
}
}
}
}