diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerPrimary.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerPrimary.cs index 9e40e6b549..d20ea8dd81 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerPrimary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerPrimary.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.IO; using System.Runtime.InteropServices; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure; @@ -75,15 +76,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http { dispatchPipe.Init(Thread.Loop, Thread.QueueCloseHandle, true); pipe.Accept(dispatchPipe); + + // Ensure client sends "Kestrel" before adding pipe to _dispatchPipes. + var readContext = new PipeReadContext(this); + dispatchPipe.ReadStart( + (handle, status2, state) => ((PipeReadContext)state).AllocCallback(handle, status2), + (handle, status2, state) => ((PipeReadContext)state).ReadCallback(handle, status2), + readContext); } catch (UvException ex) { dispatchPipe.Dispose(); Log.LogError(0, ex, "ListenerPrimary.OnListenPipe"); - return; } - - _dispatchPipes.Add(dispatchPipe); } protected override void DispatchConnection(UvStreamHandle socket) @@ -179,5 +184,55 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http }, this).ConfigureAwait(false); } } + + private class PipeReadContext + { + private readonly ListenerPrimary _listener; + private readonly IntPtr _bufPtr; + private GCHandle _bufHandle; + private int _bytesRead; + + public PipeReadContext(ListenerPrimary listener) + { + _listener = listener; + _bufHandle = GCHandle.Alloc(new byte[8], GCHandleType.Pinned); + _bufPtr = _bufHandle.AddrOfPinnedObject(); + } + + public Libuv.uv_buf_t AllocCallback(UvStreamHandle dispatchPipe, int suggestedSize) + { + return dispatchPipe.Libuv.buf_init(_bufPtr + _bytesRead, 8 - _bytesRead); + } + + public unsafe void ReadCallback(UvStreamHandle dispatchPipe, int status) + { + try + { + dispatchPipe.Libuv.ThrowIfErrored(status); + + _bytesRead += status; + + if (_bytesRead == 8) + { + if (*(ulong*)_bufPtr == Constants.PipeMessage) + { + _listener._dispatchPipes.Add((UvPipeHandle) dispatchPipe); + dispatchPipe.ReadStop(); + _bufHandle.Free(); + } + else + { + throw new IOException("Bad data sent over Kestrel pipe."); + } + } + } + catch (Exception ex) + { + dispatchPipe.Dispose(); + _bufHandle.Free(); + _listener.Log.LogError(0, ex, "ListenerPrimary.ReadCallback"); + } + } + } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerSecondary.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerSecondary.cs index 3fd1db29be..9220aa4ddc 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerSecondary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ListenerSecondary.cs @@ -17,6 +17,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http /// public abstract class ListenerSecondary : ListenerContext, IAsyncDisposable { + private static ArraySegment> _pipeMessage = + new ArraySegment>(new[] { new ArraySegment(BitConverter.GetBytes(Constants.PipeMessage)) }); + private string _pipeName; private IntPtr _ptr; private Libuv.uv_buf_t _buf; @@ -89,6 +92,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http return; } + var writeReq = new UvWriteReq(Log); + try { DispatchPipe.ReadStart( @@ -96,10 +101,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http (handle, status2, state) => ((ListenerSecondary)state).ReadStartCallback(handle, status2), this); - tcs.SetResult(0); + writeReq.Init(Thread.Loop); + writeReq.Write( + DispatchPipe, + _pipeMessage, + (req, status2, ex, state) => + { + req.Dispose(); + + if (ex != null) + { + tcs.SetException(ex); + } + else + { + tcs.SetResult(0); + } + }, + tcs); } catch (Exception ex) { + writeReq.Dispose(); DispatchPipe.Dispose(); tcs.SetException(ex); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Infrastructure/Constants.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Infrastructure/Constants.cs index 77968bca17..9c9fbf73a7 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Infrastructure/Constants.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Infrastructure/Constants.cs @@ -20,6 +20,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure public const string ServerName = "Kestrel"; + // "Kestrel\0" + public const ulong PipeMessage = 0x006C65727473654B; + private static int? GetECONNRESET() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Networking/UvWriteReq.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Networking/UvWriteReq.cs index 4a0aec28bc..93a06519a0 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Networking/UvWriteReq.cs @@ -85,18 +85,30 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Networking _callback = null; _state = null; Unpin(this); - - var block = start.Block; - for (var index = 0; index < nBuffers; index++) - { - block = block.Next; - } - throw; } } - public unsafe void Write2( + public void Write( + UvStreamHandle handle, + ArraySegment> bufs, + Action callback, + object state) + { + WriteArraySegmentInternal(handle, bufs, sendHandle: null, callback: callback, state: state); + } + + public void Write2( + UvStreamHandle handle, + ArraySegment> bufs, + UvStreamHandle sendHandle, + Action callback, + object state) + { + WriteArraySegmentInternal(handle, bufs, sendHandle, callback, state); + } + + private unsafe void WriteArraySegmentInternal( UvStreamHandle handle, ArraySegment> bufs, UvStreamHandle sendHandle, @@ -133,7 +145,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Networking _callback = callback; _state = state; - _uv.write2(this, handle, pBuffers, nBuffers, sendHandle, _uv_write_cb); + + if (sendHandle == null) + { + _uv.write(this, handle, pBuffers, nBuffers, _uv_write_cb); + } + else + { + _uv.write2(this, handle, pBuffers, nBuffers, sendHandle, _uv_write_cb); + } } catch { diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/ListenerPrimaryTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/ListenerPrimaryTests.cs new file mode 100644 index 0000000000..13be40ed2c --- /dev/null +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/ListenerPrimaryTests.cs @@ -0,0 +1,229 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Hosting.Server; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Server.Kestrel; +using Microsoft.AspNetCore.Server.Kestrel.Internal; +using Microsoft.AspNetCore.Server.Kestrel.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Internal.Networking; +using Microsoft.AspNetCore.Testing; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Microsoft.AspNetCore.Server.KestrelTests +{ + public class ListenerPrimaryTests + { + [Fact] + public async Task ConnectionsGetRoundRobinedToSecondaryListeners() + { + var libuv = new Libuv(); + + var serviceContextPrimary = new TestServiceContext + { + FrameFactory = context => + { + return new Frame(new TestApplication(c => + { + return c.Response.WriteAsync("Primary"); + }), context); + } + }; + + var serviceContextSecondary = new ServiceContext + { + Log = serviceContextPrimary.Log, + AppLifetime = serviceContextPrimary.AppLifetime, + DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager, + ServerOptions = serviceContextPrimary.ServerOptions, + ThreadPool = serviceContextPrimary.ThreadPool, + FrameFactory = context => + { + return new Frame(new TestApplication(c => + { + return c.Response.WriteAsync("Secondary"); ; + }), context); + } + }; + + using (var kestrelEngine = new KestrelEngine(libuv, serviceContextPrimary)) + { + var address = ServerAddress.FromUrl("http://127.0.0.1:0/"); + var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); + + // Start primary listener + var kestrelThreadPrimary = new KestrelThread(kestrelEngine); + await kestrelThreadPrimary.StartAsync(); + var listenerPrimary = new TcpListenerPrimary(serviceContextPrimary); + await listenerPrimary.StartAsync(pipeName, address, kestrelThreadPrimary); + + // Until a secondary listener is added, TCP connections get dispatched directly + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + + // Add secondary listener + var kestrelThreadSecondary = new KestrelThread(kestrelEngine); + await kestrelThreadSecondary.StartAsync(); + var listenerSecondary = new TcpListenerSecondary(serviceContextSecondary); + await listenerSecondary.StartAsync(pipeName, address, kestrelThreadSecondary); + + // Once a secondary listener is added, TCP connections start getting dispatched to it + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + + // TCP connections will still get round-robined to the primary listener + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + + await listenerSecondary.DisposeAsync(); + await kestrelThreadSecondary.StopAsync(TimeSpan.FromSeconds(1)); + + await listenerPrimary.DisposeAsync(); + await kestrelThreadPrimary.StopAsync(TimeSpan.FromSeconds(1)); + } + } + + // https://github.com/aspnet/KestrelHttpServer/issues/1182 + [Fact] + public async Task NonListenerPipeConnectionsAreLoggedAndIgnored() + { + var libuv = new Libuv(); + + var primaryTrace = new TestKestrelTrace(); + + var serviceContextPrimary = new TestServiceContext + { + Log = primaryTrace, + FrameFactory = context => + { + return new Frame(new TestApplication(c => + { + return c.Response.WriteAsync("Primary"); + }), context); + } + }; + + var serviceContextSecondary = new ServiceContext + { + Log = new TestKestrelTrace(), + AppLifetime = serviceContextPrimary.AppLifetime, + DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager, + ServerOptions = serviceContextPrimary.ServerOptions, + ThreadPool = serviceContextPrimary.ThreadPool, + FrameFactory = context => + { + return new Frame(new TestApplication(c => + { + return c.Response.WriteAsync("Secondary"); ; + }), context); + } + }; + + using (var kestrelEngine = new KestrelEngine(libuv, serviceContextPrimary)) + { + var address = ServerAddress.FromUrl("http://127.0.0.1:0/"); + var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); + + // Start primary listener + var kestrelThreadPrimary = new KestrelThread(kestrelEngine); + await kestrelThreadPrimary.StartAsync(); + var listenerPrimary = new TcpListenerPrimary(serviceContextPrimary); + await listenerPrimary.StartAsync(pipeName, address, kestrelThreadPrimary); + + // Add secondary listener + var kestrelThreadSecondary = new KestrelThread(kestrelEngine); + await kestrelThreadSecondary.StartAsync(); + var listenerSecondary = new TcpListenerSecondary(serviceContextSecondary); + await listenerSecondary.StartAsync(pipeName, address, kestrelThreadSecondary); + + // TCP Connections get round-robined + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + + // Create a pipe connection and keep it open without sending any data + var connectTcs = new TaskCompletionSource(); + var connectionTrace = new TestKestrelTrace(); + var pipe = new UvPipeHandle(connectionTrace); + + kestrelThreadPrimary.Post(_ => + { + var connectReq = new UvConnectRequest(connectionTrace); + + pipe.Init(kestrelThreadPrimary.Loop, kestrelThreadPrimary.QueueCloseHandle); + connectReq.Init(kestrelThreadPrimary.Loop); + + connectReq.Connect( + pipe, + pipeName, + (req, status, ex, __) => + { + req.Dispose(); + + if (ex != null) + { + connectTcs.SetException(ex); + } + else + { + connectTcs.SetResult(null); + } + }, + null); + }, null); + + await connectTcs.Task; + + // TCP connections will still get round-robined between only the two listeners + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + + await kestrelThreadPrimary.PostAsync(_ => pipe.Dispose(), null); + + // Same for after the non-listener pipe connection is closed + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address.ToString())); + Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address.ToString())); + + await listenerSecondary.DisposeAsync(); + await kestrelThreadSecondary.StopAsync(TimeSpan.FromSeconds(1)); + + await listenerPrimary.DisposeAsync(); + await kestrelThreadPrimary.StopAsync(TimeSpan.FromSeconds(1)); + } + + Assert.Equal(1, primaryTrace.Logger.TotalErrorsLogged); + var errorMessage = primaryTrace.Logger.Messages.First(m => m.LogLevel == LogLevel.Error); + Assert.Contains("EOF", errorMessage.Exception.ToString()); + } + + private class TestApplication : IHttpApplication + { + private readonly Func _app; + + public TestApplication(Func app) + { + _app = app; + } + + public DefaultHttpContext CreateContext(IFeatureCollection contextFeatures) + { + return new DefaultHttpContext(contextFeatures); + } + + public Task ProcessRequestAsync(DefaultHttpContext context) + { + return _app(context); + } + + public void DisposeContext(DefaultHttpContext context, Exception exception) + { + } + } + } +} diff --git a/test/shared/TestKestrelTrace.cs b/test/shared/TestKestrelTrace.cs index 814005d4d1..f9879bda37 100644 --- a/test/shared/TestKestrelTrace.cs +++ b/test/shared/TestKestrelTrace.cs @@ -1,5 +1,7 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + using Microsoft.AspNetCore.Server.Kestrel.Internal; -using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Testing { @@ -9,10 +11,13 @@ namespace Microsoft.AspNetCore.Testing { } - public TestKestrelTrace(ILogger testLogger) : base(testLogger) + public TestKestrelTrace(TestApplicationErrorLogger testLogger) : base(testLogger) { + Logger = testLogger; } + public TestApplicationErrorLogger Logger { get; private set; } + public override void ConnectionRead(string connectionId, int count) { //_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" recv {count} bytes.", connectionId, count);