Wait for requests to drain during shutdown.

This commit is contained in:
Chris Ross 2014-05-30 15:54:03 -07:00
parent 60812e51f1
commit 9191bddf00
2 changed files with 52 additions and 6 deletions

View File

@ -39,6 +39,10 @@ namespace Microsoft.AspNet.Server.WebListener
private int _acceptorCounts;
private Action<object> _processRequest;
private bool _stopping;
private int _outstandingRequests;
private ManualResetEvent _shutdownSignal;
// TODO: private IDictionary<string, object> _capabilities;
internal MessagePump(Microsoft.Net.Server.WebListener listener, ILoggerFactory loggerFactory)
@ -49,6 +53,7 @@ namespace Microsoft.AspNet.Server.WebListener
_processRequest = new Action<object>(ProcessRequestAsync);
_maxAccepts = DefaultMaxAccepts;
_shutdownSignal = new ManualResetEvent(false);
}
internal Microsoft.Net.Server.WebListener Listener
@ -108,7 +113,7 @@ namespace Microsoft.AspNet.Server.WebListener
private async void ProcessRequestsWorker()
{
int workerIndex = Interlocked.Increment(ref _acceptorCounts);
while (_listener.IsListening && workerIndex <= MaxAccepts)
while (!_stopping && workerIndex <= MaxAccepts)
{
// Receive a request
RequestContext requestContext;
@ -124,7 +129,7 @@ namespace Microsoft.AspNet.Server.WebListener
}
try
{
Task.Factory.StartNew(_processRequest, requestContext);
Task ignored = Task.Factory.StartNew(_processRequest, requestContext);
}
catch (Exception ex)
{
@ -141,11 +146,18 @@ namespace Microsoft.AspNet.Server.WebListener
var requestContext = requestContextObj as RequestContext;
try
{
if (_stopping)
{
SetFatalResponse(requestContext, 503);
return;
}
try
{
Interlocked.Increment(ref _outstandingRequests);
FeatureContext featureContext = new FeatureContext(requestContext);
await _appFunc(featureContext.Features).SupressContext();
// TODO: WebSocket/Opaque upgrade - await requestContext.ProcessResponseAsync().SupressContext();
requestContext.Dispose();
}
catch (Exception ex)
{
@ -157,10 +169,16 @@ namespace Microsoft.AspNet.Server.WebListener
else
{
// We haven't sent a response yet, try to send a 500 Internal Server Error
SetFatalResponse(requestContext);
SetFatalResponse(requestContext, 500);
}
}
finally
{
if (Interlocked.Decrement(ref _outstandingRequests) == 0 && _stopping)
{
_shutdownSignal.Set();
}
}
requestContext.Dispose();
}
catch (Exception ex)
{
@ -169,16 +187,24 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
private static void SetFatalResponse(RequestContext context)
private static void SetFatalResponse(RequestContext context, int status)
{
context.Response.StatusCode = 500;
context.Response.StatusCode = status;
context.Response.ReasonPhrase = string.Empty;
context.Response.Headers.Clear();
context.Response.ContentLength = 0;
context.Dispose();
}
public void Dispose()
{
_stopping = true;
// Wait for active requests to drain
if (_outstandingRequests > 0)
{
_shutdownSignal.WaitOne();
}
// All requests are finished
_listener.Dispose();
}
}

View File

@ -80,6 +80,26 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
[Fact]
public async Task Server_ShutdownDurringRequest_Success()
{
Task<string> responseTask;
ManualResetEvent received = new ManualResetEvent(false);
using (Utilities.CreateHttpServer(env =>
{
received.Set();
var httpContext = new DefaultHttpContext((IFeatureCollection)env);
httpContext.Response.ContentLength = 11;
return httpContext.Response.WriteAsync("Hello World");
}))
{
responseTask = SendRequestAsync(Address);
Assert.True(received.WaitOne(10000));
}
string response = await responseTask;
Assert.Equal("Hello World", response);
}
[Fact]
public void Server_AppException_ClientReset()
{