Prepare for OnReader/WriterCallbacks changes (#1791)

- This change does a few things:

1. It adds the events we will replace with
pipe events to IConnectionContext and IConnectionInformation to get out of
band notifications about pipe completions.

2. It also implements those callbacks
and exposing slight changes we'll need to make once we have them. The idea is
that we can delete/replace these methods once we have the new pipe API and things
will keep working.
This commit is contained in:
David Fowler 2017-04-29 00:41:48 -07:00 committed by GitHub
parent 3b2d0a52f3
commit 749e282102
9 changed files with 84 additions and 74 deletions

View File

@ -87,8 +87,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
}
public async void OnConnectionClosed()
public async void OnConnectionClosed(Exception ex)
{
// Abort the connection (if it isn't already aborted)
_frame.Abort(ex);
Log.ConnectionStop(ConnectionId);
KestrelEventSource.Log.ConnectionStop(this);
_socketClosedTcs.SetResult(null);

View File

@ -31,6 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// this is temporary until it does
private TaskCompletionSource<object> _flushTcs;
private readonly object _flushLock = new object();
private Action _flushCompleted;
public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log)
{
@ -38,6 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_frame = frame;
_connectionId = connectionId;
_log = log;
_flushCompleted = OnFlushCompleted;
}
public Task WriteAsync(
@ -83,8 +85,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
var awaitable = writableBuffer.FlushAsync(cancellationToken);
if (awaitable.IsCompleted)
{
AbortIfNeeded(awaitable);
// The flush task can't fail today
return TaskCache.CompletedTask;
}
@ -103,27 +103,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
_flushTcs = new TaskCompletionSource<object>();
awaitable.OnCompleted(() =>
{
AbortIfNeeded(awaitable);
_flushTcs.TrySetResult(null);
});
awaitable.OnCompleted(_flushCompleted);
}
}
await _flushTcs.Task;
if (cancellationToken.IsCancellationRequested)
{
_frame.Abort(error: null);
}
cancellationToken.ThrowIfCancellationRequested();
}
private void AbortIfNeeded(WritableBufferAwaitable awaitable)
private void OnFlushCompleted()
{
try
{
awaitable.GetResult();
}
catch (Exception ex)
{
_frame.Abort(ex);
}
_flushTcs.TrySetResult(null);
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool chunk)

View File

@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions
IPipeWriter Input { get; }
IPipeReader Output { get; }
// TODO: Remove these (Use Pipes Tasks instead?)
void OnConnectionClosed();
// TODO: Remove these (https://github.com/aspnet/KestrelHttpServer/issues/1772)
void OnConnectionClosed(Exception ex);
void Abort(Exception ex);
}
}

View File

@ -66,17 +66,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// Start socket prior to applying the ConnectionAdapter
StartReading();
Exception error = null;
try
{
// This *must* happen after socket.ReadStart
// The socket output consumer is the only thing that can close the connection. If the
// output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards.
await Output.WriteOutputAsync();
_connectionContext.Output.Complete();
}
catch (UvException ex)
{
_connectionContext.Output.Complete(ex);
error = new IOException(ex.Message, ex);
}
finally
{
@ -91,7 +92,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
_socket.Dispose();
// Tell the kestrel we're done with this connection
_connectionContext.OnConnectionClosed();
_connectionContext.OnConnectionClosed(error);
_connectionContext.Output.Complete(error);
}
}
catch (Exception e)
@ -221,7 +223,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected).
// This should be treated the same as OnRead() seeing a "normalDone" condition.
Log.ConnectionReadFin(ConnectionId);
Input.Complete(new IOException(ex.Message, ex));
var error = new IOException(ex.Message, ex);
_connectionContext.Abort(error);
Input.Complete(error);
}
}
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;

View File

@ -7,7 +7,6 @@ using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
@ -72,15 +71,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
// TODO: Log
}
finally
{
// Mark the connection as closed after disposal
_connectionContext.OnConnectionClosed();
}
}
private async Task DoReceive()
{
Exception error = null;
try
{
while (true)
@ -112,40 +108,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
break;
}
}
_connectionContext.Abort(ex: null);
_input.Complete();
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
error = new ConnectionResetException(ex.Message, ex);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
error = new TaskCanceledException("The request was aborted");
}
catch (ObjectDisposedException)
{
error = new TaskCanceledException("The request was aborted");
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
Exception error = null;
if (ex is SocketException se)
{
if (se.SocketErrorCode == SocketError.ConnectionReset)
{
// Connection reset
error = new ConnectionResetException(ex.Message, ex);
}
else if (se.SocketErrorCode == SocketError.OperationAborted)
{
error = new TaskCanceledException("The request was aborted");
}
}
if (ex is ObjectDisposedException)
{
error = new TaskCanceledException("The request was aborted");
}
else if (ex is IOException ioe)
{
error = ioe;
}
else if (error == null)
{
error = new IOException(ex.Message, ex);
}
error = new IOException(ex.Message, ex);
}
finally
{
_connectionContext.Abort(error);
_input.Complete(error);
}
@ -172,6 +157,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
private async Task DoSend()
{
Exception error = null;
try
{
while (true)
@ -220,13 +207,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
_output.Advance(buffer.End);
}
}
// We're done reading
_output.Complete();
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
error = null;
}
catch (ObjectDisposedException)
{
error = null;
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
_output.Complete(ex);
error = new IOException(ex.Message, ex);
}
finally
{
_connectionContext.OnConnectionClosed(error);
_output.Complete(error);
}
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;

View File

@ -9,10 +9,12 @@ using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
using Microsoft.AspNetCore.Testing;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
@ -282,11 +284,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
Assert.NotEmpty(completeQueue);
// Add more bytes to the write-behind buffer to prevent the next write from
((ISocketOutput) socketOutput).Write((writableBuffer, state) =>
{
writableBuffer.Write(state);
},
halfWriteBehindBuffer);
((ISocketOutput)socketOutput).Write((writableBuffer, state) =>
{
writableBuffer.Write(state);
},
halfWriteBehindBuffer);
// Act
var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken));
@ -679,12 +681,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
frame.RequestAborted.Register(cts.Cancel);
}
var ignore = WriteOutputAsync(consumer, pipe.Reader);
var ignore = WriteOutputAsync(consumer, pipe.Reader, frame);
return socketOutput;
}
private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader)
private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader, Frame frame)
{
// This WriteOutputAsync() calling code is equivalent to that in LibuvConnection.
try
@ -692,10 +694,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
// Ensure that outputReader.Complete() runs on the LibuvThread.
// Without ConfigureAwait(false), xunit will dispatch.
await consumer.WriteOutputAsync().ConfigureAwait(false);
frame.Abort(error: null);
outputReader.Complete();
}
catch (UvException ex)
{
frame.Abort(ex);
outputReader.Complete(ex);
}
}

View File

@ -35,12 +35,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
public IPipeWriter Input { get; set; }
public IPipeReader Output { get; set; }
public void OnConnectionClosed()
public void Abort(Exception ex)
{
}
public void Abort(Exception ex)
public void OnConnectionClosed(Exception ex)
{
}
}