Tweaks to socket transport (#1687)
- Style changes: Sort usings, use more var and C#7 - Added ConfigureAwait to stop dead locks with xunit - Remove duplicate handling of aborts in the receiver - Handle the case where output ends before input
This commit is contained in:
parent
efa0a48fb1
commit
aedd061865
|
|
@ -1,15 +1,15 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
||||
{
|
||||
|
|
@ -40,72 +40,89 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
|
||||
public async void Start(IConnectionHandler connectionHandler)
|
||||
{
|
||||
_connectionContext = connectionHandler.OnConnection(this);
|
||||
try
|
||||
{
|
||||
_connectionContext = connectionHandler.OnConnection(this);
|
||||
|
||||
_input = _connectionContext.Input;
|
||||
_output = _connectionContext.Output;
|
||||
_input = _connectionContext.Input;
|
||||
_output = _connectionContext.Output;
|
||||
|
||||
// Spawn send and receive logic
|
||||
Task receiveTask = DoReceive();
|
||||
Task sendTask = DoSend();
|
||||
// Spawn send and receive logic
|
||||
Task receiveTask = DoReceive();
|
||||
Task sendTask = DoSend();
|
||||
|
||||
// Wait for them to complete (note they won't throw exceptions)
|
||||
await receiveTask;
|
||||
await sendTask;
|
||||
// Wait for eiher of them to complete (note they won't throw exceptions)
|
||||
await Task.WhenAny(receiveTask, sendTask);
|
||||
|
||||
_socket.Dispose();
|
||||
// Shut the socket down and wait for both sides to end
|
||||
_socket.Shutdown(SocketShutdown.Both);
|
||||
|
||||
_connectionContext.OnConnectionClosed();
|
||||
// Now wait for both to complete
|
||||
await receiveTask;
|
||||
await sendTask;
|
||||
|
||||
// Dispose the socket
|
||||
_socket.Dispose();
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// TODO: Log
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Mark the connection as closed after disposal
|
||||
_connectionContext.OnConnectionClosed();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task DoReceive()
|
||||
{
|
||||
try
|
||||
{
|
||||
bool done = false;
|
||||
while (!done)
|
||||
while (true)
|
||||
{
|
||||
// Ensure we have some reasonable amount of buffer space
|
||||
WritableBuffer buffer = _input.Alloc(MinAllocBufferSize);
|
||||
var buffer = _input.Alloc(MinAllocBufferSize);
|
||||
|
||||
int bytesReceived;
|
||||
|
||||
try
|
||||
{
|
||||
bytesReceived = await _socket.ReceiveAsync(GetArraySegment(buffer.Buffer), SocketFlags.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (Exception)
|
||||
{
|
||||
buffer.Commit();
|
||||
_connectionContext.Abort(ex);
|
||||
_input.Complete(ex);
|
||||
break;
|
||||
throw;
|
||||
}
|
||||
|
||||
if (bytesReceived == 0)
|
||||
{
|
||||
// EOF
|
||||
Exception ex = new TaskCanceledException();
|
||||
buffer.Commit();
|
||||
_connectionContext.Abort(ex);
|
||||
_input.Complete(ex);
|
||||
break;
|
||||
|
||||
// We receive a FIN so throw an exception so that we cancel the input
|
||||
// with an error
|
||||
throw new TaskCanceledException("The request was aborted");
|
||||
}
|
||||
|
||||
// record what data we filled into the buffer and push to pipe
|
||||
// Record what data we filled into the buffer and push to pipe
|
||||
buffer.Advance(bytesReceived);
|
||||
|
||||
var result = await buffer.FlushAsync();
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
// Pipe consumer is shut down
|
||||
// Pipe consumer is shut down, do we stop writing
|
||||
_socket.Shutdown(SocketShutdown.Receive);
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_input.Complete();
|
||||
}
|
||||
catch (Exception)
|
||||
catch (Exception ex)
|
||||
{
|
||||
// We don't expect any exceptions here, but eat it anyway as caller does not handle this.
|
||||
Debug.Assert(false);
|
||||
_connectionContext.Abort(ex);
|
||||
_input.Complete(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -132,12 +149,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
{
|
||||
try
|
||||
{
|
||||
bool done = false;
|
||||
while (!done)
|
||||
while (true)
|
||||
{
|
||||
// Wait for data to write from the pipe producer
|
||||
ReadResult result = await _output.ReadAsync();
|
||||
ReadableBuffer buffer = result.Buffer;
|
||||
var result = await _output.ReadAsync();
|
||||
var buffer = result.Buffer;
|
||||
|
||||
try
|
||||
{
|
||||
|
|
@ -150,6 +166,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
else
|
||||
{
|
||||
SetupSendBuffers(buffer);
|
||||
|
||||
try
|
||||
{
|
||||
await _socket.SendAsync(_sendBufferList, SocketFlags.None);
|
||||
|
|
@ -170,8 +187,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
|
||||
if (buffer.IsEmpty && result.IsCompleted)
|
||||
{
|
||||
// Send a FIN
|
||||
_socket.Shutdown(SocketShutdown.Send);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -192,8 +207,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
|
||||
private static ArraySegment<byte> GetArraySegment(Buffer<byte> buffer)
|
||||
{
|
||||
ArraySegment<byte> segment;
|
||||
if (!buffer.TryGetArray(out segment))
|
||||
if (!buffer.TryGetArray(out var segment))
|
||||
{
|
||||
throw new InvalidOperationException();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +1,12 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
||||
{
|
||||
|
|
@ -85,7 +84,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
listenSocket.Dispose();
|
||||
|
||||
Debug.Assert(_listenTask != null);
|
||||
await _listenTask;
|
||||
await _listenTask.ConfigureAwait(false);
|
||||
_listenTask = null;
|
||||
}
|
||||
}
|
||||
|
|
@ -101,11 +100,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
{
|
||||
while (true)
|
||||
{
|
||||
Socket acceptSocket = await _listenSocket.AcceptAsync();
|
||||
var acceptSocket = await _listenSocket.AcceptAsync();
|
||||
|
||||
acceptSocket.NoDelay = _endPointInformation.NoDelay;
|
||||
|
||||
SocketConnection connection = new SocketConnection(acceptSocket, this);
|
||||
var connection = new SocketConnection(acceptSocket, this);
|
||||
connection.Start(_handler);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
using System;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue