diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs index d496d1d29d..d428722223 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs @@ -80,7 +80,8 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal public HttpTransportType TransportType { get; set; } - public SemaphoreSlim Lock { get; } = new SemaphoreSlim(1, 1); + public SemaphoreSlim WriteLock { get; } = new SemaphoreSlim(1, 1); + public SemaphoreSlim StateLock { get; } = new SemaphoreSlim(1, 1); // Used for testing only internal Task DisposeAndRemoveTask { get; set; } @@ -179,10 +180,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal { var disposeTask = Task.CompletedTask; + await StateLock.WaitAsync(); try { - await Lock.WaitAsync(); - if (Status == HttpConnectionStatus.Disposed) { disposeTask = _disposeTcs.Task; @@ -201,7 +201,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - Lock.Release(); + StateLock.Release(); } await disposeTask; diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs index 0f4f88551f..55bae0f94a 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs @@ -188,10 +188,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal return; } + await connection.StateLock.WaitAsync(); try { - await connection.Lock.WaitAsync(); - if (connection.Status == HttpConnectionStatus.Disposed) { Log.ConnectionDisposed(_logger, connection.ConnectionId); @@ -263,7 +262,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - connection.Lock.Release(); + connection.StateLock.Release(); } var resultTask = await Task.WhenAny(connection.ApplicationTask, connection.TransportTask); @@ -299,10 +298,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal if (pollAgain) { // Otherwise, we update the state to inactive again and wait for the next poll + await connection.StateLock.WaitAsync(); try { - await connection.Lock.WaitAsync(); - if (connection.Status == HttpConnectionStatus.Active) { // Mark the connection as inactive @@ -318,7 +316,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - connection.Lock.Release(); + connection.StateLock.Release(); } } } @@ -329,10 +327,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal HttpContext context, HttpConnectionContext connection) { + await connection.StateLock.WaitAsync(); try { - await connection.Lock.WaitAsync(); - if (connection.Status == HttpConnectionStatus.Disposed) { Log.ConnectionDisposed(_logger, connection.ConnectionId); @@ -363,7 +360,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - connection.Lock.Release(); + connection.StateLock.Release(); } // Wait for any of them to end @@ -468,9 +465,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal const int bufferSize = 4096; - // REVIEW: Consider spliting the connection lock into a read lock and a write lock - // Need to think about HttpConnectionContext.DisposeAsync and whether one or both locks would be needed - await connection.Lock.WaitAsync(); + await connection.WriteLock.WaitAsync(); try { @@ -493,7 +488,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - connection.Lock.Release(); + connection.WriteLock.Release(); } } diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionManager.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionManager.cs index 79b849120d..43e5982748 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionManager.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionManager.cs @@ -142,7 +142,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal DateTimeOffset lastSeenUtc; var connection = c.Value.Connection; - await connection.Lock.WaitAsync(); + await connection.StateLock.WaitAsync(); try { @@ -153,7 +153,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } finally { - connection.Lock.Release(); + connection.StateLock.Release(); } // Once the decision has been made to dispose we don't check the status again diff --git a/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionDispatcherTests.cs b/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionDispatcherTests.cs index 9443099b05..139f4474ec 100644 --- a/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionDispatcherTests.cs +++ b/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionDispatcherTests.cs @@ -1390,7 +1390,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests connectionHandlerTask = dispatcher.ExecuteAsync(context, options, app); await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).AsTask().OrTimeout(); await connectionHandlerTask.OrTimeout(); - + Assert.Equal(StatusCodes.Status200OK, context.Response.StatusCode); Assert.Equal("Hello, World", GetContentAsString(context.Response.Body)); } @@ -1797,6 +1797,58 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests } } + [Fact] + public async Task LongPollingCanPollIfWritePipeHasBackpressure() + { + using (StartVerifableLog(out var loggerFactory, LogLevel.Debug)) + { + var manager = CreateConnectionManager(loggerFactory); + var pipeOptions = new PipeOptions(pauseWriterThreshold: 13, resumeWriterThreshold: 10); + var connection = manager.CreateConnection(pipeOptions, pipeOptions); + connection.TransportType = HttpTransportType.LongPolling; + + var dispatcher = new HttpConnectionDispatcher(manager, loggerFactory); + + var services = new ServiceCollection(); + services.AddSingleton(); + var builder = new ConnectionBuilder(services.BuildServiceProvider()); + builder.UseConnectionHandler(); + var app = builder.Build(); + var options = new HttpConnectionDispatcherOptions(); + + using (var responseBody = new MemoryStream()) + using (var requestBody = new MemoryStream()) + { + var context = new DefaultHttpContext(); + context.Request.Body = requestBody; + context.Response.Body = responseBody; + context.Request.Path = "/foo"; + context.Request.Method = "POST"; + var values = new Dictionary(); + values["id"] = connection.ConnectionId; + var qs = new QueryCollection(values); + context.Request.Query = qs; + var buffer = Encoding.UTF8.GetBytes("Hello, world"); + requestBody.Write(buffer, 0, buffer.Length); + requestBody.Seek(0, SeekOrigin.Begin); + + // Write some data to the pipe to fill it up and make the next write wait + await connection.ApplicationStream.WriteAsync(buffer, 0, buffer.Length).OrTimeout(); + + // This will block until the pipe is unblocked + var sendTask = dispatcher.ExecuteAsync(context, options, app).OrTimeout(); + Assert.False(sendTask.IsCompleted); + + var pollContext = MakeRequest("/foo", connection); + // This should unblock the send that is waiting because of backpressure + // Testing deadlock regression where pipe backpressure would hold the same lock that poll would use + await dispatcher.ExecuteAsync(pollContext, options, app).OrTimeout(); + + await sendTask.OrTimeout(); + } + } + } + private class RejectHandler : TestAuthenticationHandler { protected override bool ShouldAccept => false;