Remove the events on ConnectionContext (#2023)

- Use the pipe events and removed the Tasks from ConnectionContext
- Remove OnConnectionClosed from FrameConnection. Since the `FrameConnetion` is a single middleware, not the entire pipeline, we shouldn't need to wait on the connection close there.
- It seems like the callbacks are rooted on the pipe even after they fire. This needs to be investigated in pipelines.
This commit is contained in:
David Fowler 2017-08-26 20:19:55 -07:00 committed by GitHub
parent 09be7c416a
commit 7854c0604a
13 changed files with 34 additions and 106 deletions

View File

@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private readonly FrameConnectionContext _context; private readonly FrameConnectionContext _context;
private IList<IAdaptedConnection> _adaptedConnections; private IList<IAdaptedConnection> _adaptedConnections;
private readonly TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private Frame _frame; private Frame _frame;
private Http2Connection _http2Connection; private Http2Connection _http2Connection;
private volatile int _http2ConnectionState; private volatile int _http2ConnectionState;
@ -152,7 +151,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
} }
await adaptedPipelineTask; await adaptedPipelineTask;
await _socketClosedTcs.Task;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -191,13 +189,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}); });
} }
public void OnConnectionClosed(Exception ex)
{
Abort(ex);
_socketClosedTcs.TrySetResult(null);
}
public Task StopAsync() public Task StopAsync()
{ {
Debug.Assert(_frame != null, $"{nameof(_frame)} is null"); Debug.Assert(_frame != null, $"{nameof(_frame)} is null");

View File

@ -1,4 +1,5 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Net; using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -63,25 +64,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
var connection = new FrameConnection(frameConnectionContext); var connection = new FrameConnection(frameConnectionContext);
// The order here is important, start request processing so that
// the frame is created before this yields. Events need to be wired up
// afterwards
var processingTask = connection.StartRequestProcessing(_application); var processingTask = connection.StartRequestProcessing(_application);
// Wire up the events an forward calls to the frame connection var inputTcs = new TaskCompletionSource<object>();
// It's important that these execute synchronously because graceful
// connection close is order sensative (for now)
connectionContext.ConnectionAborted.ContinueWith((task, state) =>
{
// Unwrap the aggregate exception
((FrameConnection)state).Abort(task.Exception?.InnerException);
},
connection, TaskContinuationOptions.ExecuteSynchronously);
connectionContext.ConnectionClosed.ContinueWith((task, state) => // Abort the frame when the transport writer completes
connectionContext.Transport.Input.OnWriterCompleted((error, state) =>
{ {
// Unwrap the aggregate exception var tcs = (TaskCompletionSource<object>)state;
((FrameConnection)state).OnConnectionClosed(task.Exception?.InnerException);
if (error != null)
{
tcs.TrySetException(error);
}
else
{
tcs.TrySetResult(null);
}
},
inputTcs);
inputTcs.Task.ContinueWith((task, state) =>
{
((FrameConnection)state).Abort(task.Exception?.InnerException);
}, },
connection, TaskContinuationOptions.ExecuteSynchronously); connection, TaskContinuationOptions.ExecuteSynchronously);

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal

View File

@ -109,16 +109,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
set => Application = value; set => Application = value;
} }
Task IConnectionTransportFeature.ConnectionAborted
{
get => _abortTcs.Task;
}
Task IConnectionTransportFeature.ConnectionClosed
{
get => _closedTcs.Task;
}
object IFeatureCollection.this[Type key] object IFeatureCollection.this[Type key]
{ {
get => FastFeatureGet(key); get => FastFeatureGet(key);

View File

@ -1,15 +1,11 @@
using System; using System;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Net; using System.Net;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
{ {
public abstract partial class TransportConnection public abstract partial class TransportConnection
{ {
private readonly TaskCompletionSource<object> _abortTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<object> _closedTcs = new TaskCompletionSource<object>();
public TransportConnection() public TransportConnection()
{ {
_currentIConnectionIdFeature = this; _currentIConnectionIdFeature = this;
@ -31,28 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
public IPipeConnection Transport { get; set; } public IPipeConnection Transport { get; set; }
public IPipeConnection Application { get; set; } public IPipeConnection Application { get; set; }
protected void Abort(Exception exception) public IPipeWriter Input => Application.Output;
{ public IPipeReader Output => Application.Input;
if (exception == null)
{
_abortTcs.TrySetResult(null);
}
else
{
_abortTcs.TrySetException(exception);
}
}
protected void Close(Exception exception)
{
if (exception == null)
{
_closedTcs.TrySetResult(null);
}
else
{
_closedTcs.TrySetException(exception);
}
}
} }
} }

View File

@ -46,9 +46,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
} }
} }
public IPipeWriter Input => Application.Output;
public IPipeReader Output => Application.Input;
public LibuvOutputConsumer OutputConsumer { get; set; } public LibuvOutputConsumer OutputConsumer { get; set; }
private ILibuvTrace Log => ListenerContext.TransportContext.Log; private ILibuvTrace Log => ListenerContext.TransportContext.Log;
@ -83,7 +80,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// Now, complete the input so that no more reads can happen // Now, complete the input so that no more reads can happen
Input.Complete(error ?? new ConnectionAbortedException()); Input.Complete(error ?? new ConnectionAbortedException());
Output.Complete(error); Output.Complete(error);
Close(error);
// Make sure it isn't possible for a paused read to resume reading after calling uv_close // Make sure it isn't possible for a paused read to resume reading after calling uv_close
// on the stream handle // on the stream handle
@ -178,7 +174,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
} }
} }
Abort(error);
// Complete after aborting the connection // Complete after aborting the connection
Input.Complete(error); Input.Complete(error);
} }
@ -216,7 +211,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
Log.ConnectionReadFin(ConnectionId); Log.ConnectionReadFin(ConnectionId);
var error = new IOException(ex.Message, ex); var error = new IOException(ex.Message, ex);
Abort(error);
Input.Complete(error); Input.Complete(error);
} }
} }

View File

@ -19,8 +19,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
private readonly Socket _socket; private readonly Socket _socket;
private readonly SocketTransport _transport; private readonly SocketTransport _transport;
private IPipeWriter _input;
private IPipeReader _output;
private IList<ArraySegment<byte>> _sendBufferList; private IList<ArraySegment<byte>> _sendBufferList;
private const int MinAllocBufferSize = 2048; private const int MinAllocBufferSize = 2048;
@ -48,9 +46,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{ {
connectionHandler.OnConnection(this); connectionHandler.OnConnection(this);
_input = Application.Output;
_output = Application.Input;
// Spawn send and receive logic // Spawn send and receive logic
Task receiveTask = DoReceive(); Task receiveTask = DoReceive();
Task sendTask = DoSend(); Task sendTask = DoSend();
@ -86,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
while (true) while (true)
{ {
// Ensure we have some reasonable amount of buffer space // Ensure we have some reasonable amount of buffer space
var buffer = _input.Alloc(MinAllocBufferSize); var buffer = Input.Alloc(MinAllocBufferSize);
try try
{ {
@ -135,8 +130,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
} }
finally finally
{ {
Abort(error); Input.Complete(error);
_input.Complete(error);
} }
} }
@ -168,7 +162,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
while (true) while (true)
{ {
// Wait for data to write from the pipe producer // Wait for data to write from the pipe producer
var result = await _output.ReadAsync(); var result = await Output.ReadAsync();
var buffer = result.Buffer; var buffer = result.Buffer;
if (result.IsCancelled) if (result.IsCancelled)
@ -205,7 +199,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
} }
finally finally
{ {
_output.Advance(buffer.End); Output.Advance(buffer.End);
} }
} }
@ -229,8 +223,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
} }
finally finally
{ {
Close(error); Output.Complete(error);
_output.Complete(error);
} }
} }

View File

@ -15,9 +15,5 @@ namespace Microsoft.AspNetCore.Protocols
public abstract IPipeConnection Transport { get; set; } public abstract IPipeConnection Transport { get; set; }
public abstract PipeFactory PipeFactory { get; } public abstract PipeFactory PipeFactory { get; }
public abstract Task ConnectionAborted { get; }
public abstract Task ConnectionClosed { get; }
} }
} }

View File

@ -1,8 +1,4 @@
using System; using System.IO.Pipelines;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols.Features; using Microsoft.AspNetCore.Protocols.Features;
@ -39,10 +35,6 @@ namespace Microsoft.AspNetCore.Protocols
set => ConnectionTransportFeature.Transport = value; set => ConnectionTransportFeature.Transport = value;
} }
public override Task ConnectionAborted => ConnectionTransportFeature.ConnectionAborted;
public override Task ConnectionClosed => ConnectionTransportFeature.ConnectionClosed;
struct FeatureInterfaces struct FeatureInterfaces
{ {
public IConnectionIdFeature ConnectionId; public IConnectionIdFeature ConnectionId;

View File

@ -1,8 +1,4 @@
using System; using System.IO.Pipelines;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Protocols.Features namespace Microsoft.AspNetCore.Protocols.Features
{ {
@ -17,9 +13,5 @@ namespace Microsoft.AspNetCore.Protocols.Features
IScheduler InputWriterScheduler { get; } IScheduler InputWriterScheduler { get; }
IScheduler OutputReaderScheduler { get; } IScheduler OutputReaderScheduler { get; }
Task ConnectionAborted { get; }
Task ConnectionClosed { get; }
} }
} }

View File

@ -60,10 +60,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public IScheduler OutputReaderScheduler => TaskRunScheduler.Default; public IScheduler OutputReaderScheduler => TaskRunScheduler.Default;
public Task ConnectionAborted => Task.CompletedTask;
public Task ConnectionClosed => Task.CompletedTask;
public string ConnectionId { get; set; } public string ConnectionId { get; set; }
} }
} }

View File

@ -990,6 +990,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
public async Task ConnectionClosesWhenFinReceivedBeforeRequestCompletes(ListenOptions listenOptions) public async Task ConnectionClosesWhenFinReceivedBeforeRequestCompletes(ListenOptions listenOptions)
{ {
var testContext = new TestServiceContext(); var testContext = new TestServiceContext();
// FIN callbacks are scheduled so run inline to make this test more reliable
testContext.ThreadPool = new InlineLoggingThreadPool(testContext.Log);
using (var server = new TestServer(TestApp.EchoAppChunked, testContext, listenOptions)) using (var server = new TestServer(TestApp.EchoAppChunked, testContext, listenOptions))
{ {

View File

@ -3,6 +3,7 @@
using System; using System;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols; using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Protocols.Features; using Microsoft.AspNetCore.Protocols.Features;