Use libuv to track connections instead of ConcurrentDictionaries

- This means connections become untracked sooner than before and not all blocks will
  necessarily be returned.
- The assertion in the MemoryPoolBlock2 finalizer was weakened because FilteredStreamAdapter
  will continue to use blocks after libuv stops tracking the associated connection.
- Make 100% sure we don't accept new connections after we dispose the listen socket by using a flag.
- Add a (currently unused) AllowStop method to KestrelThread. This is meant to be called from
  listeners when we stop accepting new connections, but needs investigation to prevent flakiness.
This commit is contained in:
Stephen Halter 2016-02-10 15:27:32 -08:00
parent f4bb8d5eff
commit 53ecef0f98
13 changed files with 119 additions and 112 deletions

View File

@ -37,11 +37,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
public ISocketOutput SocketOutput { get; private set; }
public Task ReadInputAsync()
public void ReadInput()
{
_block = _memory.Lease();
// Use pooled block for copy
return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
_filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
{
((FilteredStreamAdapter)state).OnStreamClose(task);
}, this);

View File

@ -33,7 +33,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private readonly object _stateLock = new object();
private ConnectionState _connectionState;
private TaskCompletionSource<object> _socketClosedTcs;
private Task _readFilteredInputTask = TaskUtilities.CompletedTask;
private IPEndPoint _remoteEndPoint;
private IPEndPoint _localEndPoint;
@ -41,14 +40,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
{
_socket = socket;
socket.Connection = this;
ConnectionControl = this;
_connectionId = Interlocked.Increment(ref _lastConnectionId);
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
ConnectionManager.AddConnection(_connectionId, this);
}
// Internal for testing
@ -136,7 +134,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
switch (_connectionState)
{
case ConnectionState.SocketClosed:
return _readFilteredInputTask;
return TaskUtilities.CompletedTask;
case ConnectionState.CreatingFrame:
_connectionState = ConnectionState.ToDisconnect;
break;
@ -147,7 +145,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
_socketClosedTcs = new TaskCompletionSource<object>();
return Task.WhenAll(_socketClosedTcs.Task, _readFilteredInputTask);
return _socketClosedTcs.Task;
}
}
@ -194,19 +192,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// call on the libuv thread.
_socketClosedTcs.TrySetResult(null);
}
if (_readFilteredInputTask.IsCompleted)
{
ConnectionManager.ConnectionStopped(_connectionId);
}
else
{
_readFilteredInputTask.ContinueWith((t, state) =>
{
var connection = (Connection)state;
connection.ConnectionManager.ConnectionStopped(connection._connectionId);
}, this);
}
}
}
@ -225,7 +210,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
SocketInput = filteredStreamAdapter.SocketInput;
SocketOutput = filteredStreamAdapter.SocketOutput;
_readFilteredInputTask = filteredStreamAdapter.ReadInputAsync();
filteredStreamAdapter.ReadInput();
}
else
{

View File

@ -0,0 +1,53 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Networking;
namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
public class ConnectionManager
{
private KestrelThread _thread;
private List<Task> _connectionStopTasks;
public ConnectionManager(KestrelThread thread)
{
_thread = thread;
}
// This must be called on the libuv event loop
public void WalkConnectionsAndClose()
{
if (_connectionStopTasks != null)
{
throw new InvalidOperationException(nameof(WalkConnectionsAndClose) + " cannot be called twice.");
}
_connectionStopTasks = new List<Task>();
_thread.Walk(ptr =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
var connection = (handle as UvStreamHandle)?.Connection;
if (connection != null)
{
_connectionStopTasks.Add(connection.StopAsync());
}
});
}
public Task WaitForConnectionCloseAsync()
{
if (_connectionStopTasks == null)
{
throw new InvalidOperationException(nameof(WalkConnectionsAndClose) + " must be called first.");
}
return Task.WhenAll(_connectionStopTasks);
}
}
}

View File

@ -13,6 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// </summary>
public abstract class Listener : ListenerContext, IAsyncDisposable
{
private bool _closed;
protected Listener(ServiceContext serviceContext)
: base(serviceContext)
{
@ -26,6 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
ServerAddress = address;
Thread = thread;
ConnectionManager = new ConnectionManager(thread);
var tcs = new TaskCompletionSource<int>(this);
@ -55,11 +58,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
protected static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
var listener = (Listener) state;
if (error != null)
{
listener.Log.LogError(0, error, "Listener.ConnectionCallback");
}
else
else if (!listener._closed)
{
listener.OnConnection(stream, status);
}
@ -90,14 +94,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
var listener = (Listener)state;
listener.ListenSocket.Dispose();
listener._closed = true;
listener.ConnectionManager.WalkConnectionsAndClose();
}, this);
await ConnectionManager.CloseConnectionsAsync();
await ConnectionManager.WaitForConnectionCloseAsync();
await Thread.PostAsync(state =>
{
var listener = (Listener)state;
var writeReqPool = listener.WriteReqPool;
var writeReqPool = ((Listener)state).WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();

View File

@ -17,7 +17,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
: base(serviceContext)
{
Memory2 = new MemoryPool2();
ConnectionManager = new ConnectionManager();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
}

View File

@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private string _pipeName;
private IntPtr _ptr;
private Libuv.uv_buf_t _buf;
private bool _closed;
protected ListenerSecondary(ServiceContext serviceContext) : base(serviceContext)
{
@ -38,6 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
ServerAddress = address;
Thread = thread;
ConnectionManager = new ConnectionManager(thread);
DispatchPipe = new UvPipeHandle(Log);
@ -118,7 +120,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
return;
}
if (DispatchPipe.PendingCount() == 0)
if (_closed || DispatchPipe.PendingCount() == 0)
{
return;
}
@ -167,9 +169,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
var listener = (ListenerSecondary)state;
listener.DispatchPipe.Dispose();
listener.FreeBuffer();
listener._closed = true;
listener.ConnectionManager.WalkConnectionsAndClose();
}, this);
await ConnectionManager.CloseConnectionsAsync();
await ConnectionManager.WaitForConnectionCloseAsync();
await Thread.PostAsync(state =>
{

View File

@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;
private static readonly WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
private static readonly Action<object> _connectionCancellation = (state) => ((SocketOutput)state).CancellationTriggered();
private readonly KestrelThread _thread;
@ -279,7 +280,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
if (blockToReturn != null)
{
ReturnBlocks(blockToReturn);
ThreadPool.QueueUserWorkItem(_returnBlocks, blockToReturn);
}
}

View File

@ -44,7 +44,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
acceptSocket.NoDelay(ServerInformation.NoDelay);
listenSocket.Accept(acceptSocket);
DispatchConnection(acceptSocket);
}
catch (UvException ex)
{

View File

@ -1,59 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Http;
namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
{
public class ConnectionManager
{
private bool _managerClosed;
private ConcurrentDictionary<long, Connection> _activeConnections = new ConcurrentDictionary<long, Connection>();
public void AddConnection(long connectionId, Connection connection)
{
if (_managerClosed)
{
throw new InvalidOperationException(nameof(ConnectionManager) + " closed.");
}
if (!_activeConnections.TryAdd(connectionId, connection))
{
throw new InvalidOperationException("Connection already added.");
}
}
public void ConnectionStopped(long connectionId)
{
Connection removed;
_activeConnections.TryRemove(connectionId, out removed);
}
public Task CloseConnectionsAsync()
{
if (_managerClosed)
{
throw new InvalidOperationException(nameof(ConnectionManager) + " already closed.");
}
_managerClosed = true;
var stopTasks = new List<Task>();
foreach (var connectionId in _activeConnections.Keys)
{
Connection removed;
if (_activeConnections.TryRemove(connectionId, out removed))
{
stopTasks.Add(removed.StopAsync());
}
}
return Task.WhenAll(stopTasks);
}
}
}

View File

@ -7,7 +7,6 @@ using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Http;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Networking;
using Microsoft.Extensions.Logging;
@ -72,6 +71,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return tcs.Task;
}
// This must be called from the libuv event loop.
public void AllowStop()
{
_post.Unreference();
}
public void Stop(TimeSpan timeout)
{
if (!_initCompleted)
@ -79,19 +84,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return;
}
var stepTimeout = (int)(timeout.TotalMilliseconds / 2);
Post(t => t.OnStop());
if (!_thread.Join(stepTimeout))
if (_thread.IsAlive)
{
var stepTimeout = (int)(timeout.TotalMilliseconds / 2);
try
{
Post(t => t.OnStopImmediate());
Post(t => t.OnStopRude());
if (!_thread.Join(stepTimeout))
{
Post(t => t.OnStopImmediate());
if (!_thread.Join(stepTimeout))
{
#if NET451
_thread.Abort();
_thread.Abort();
#endif
}
}
}
catch (ObjectDisposedException)
@ -113,23 +120,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel
}
}
private void OnStop()
private void OnStopRude()
{
// If the listeners were all disposed gracefully there should be no handles
// left to dispose other than _post.
// We dispose everything here in the event they are not closed gracefully.
_engine.Libuv.walk(
_loop,
(ptr, arg) =>
Walk(ptr =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
if (handle != _post)
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
if (handle != _post)
{
handle.Dispose();
}
},
IntPtr.Zero);
handle.Dispose();
}
});
// uv_unref is idempotent so it's OK to call this here and in AllowStop.
_post.Unreference();
}
@ -175,6 +177,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return tcs.Task;
}
public void Walk(Action<IntPtr> callback)
{
_engine.Libuv.walk(
_loop,
(ptr, arg) =>
{
callback(ptr);
},
IntPtr.Zero);
}
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
EnqueueCloseHandle(callback, handle);

View File

@ -138,7 +138,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
/// <param name="block">The block to return. It must have been acquired by calling Lease on the same memory pool instance.</param>
public void Return(MemoryPoolBlock2 block)
{
Debug.Assert(block.Pool == this, "Returned block was leased from this pool");
Debug.Assert(block.Pool == this, "Returned block was not leased from this pool");
if (block.Slab != null && block.Slab.IsActive)
{

View File

@ -77,7 +77,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Infrastructure
~MemoryPoolBlock2()
{
Debug.Assert(!_pinHandle.IsAllocated, "Ad-hoc memory block wasn't unpinned");
Debug.Assert(Slab == null, "Block being garbage collected instead of returned to pool");
Debug.Assert(Slab == null || !Slab.IsActive, "Block being garbage collected instead of returned to pool");
if (_pinHandle.IsAllocated)
{

View File

@ -3,6 +3,7 @@
using System;
using System.Runtime.InteropServices;
using Microsoft.AspNetCore.Server.Kestrel.Http;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
using Microsoft.Extensions.Logging;
@ -28,6 +29,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{
}
public Connection Connection { get; set; }
protected override bool ReleaseHandle()
{
if (_listenVitality.IsAllocated)