Fix OnCompleted and implement CompleteAsync (#24686)

This commit is contained in:
Justin Kotalik 2020-08-14 14:16:05 -07:00 committed by GitHub
parent 9b9d46d01d
commit 91776832e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 367 additions and 65 deletions

View File

@ -195,18 +195,56 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
Task IHttpResponseBodyFeature.SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
=> SendFileFallback.SendFileAsync(ResponseBody, path, offset, count, cancellation);
Task IHttpResponseBodyFeature.CompleteAsync() => CompleteResponseBodyAsync();
// TODO: In the future this could complete the body all the way down to the server. For now it just ensures
// any unflushed data gets flushed.
protected Task CompleteResponseBodyAsync()
Task IHttpResponseBodyFeature.CompleteAsync()
{
if (ResponsePipeWrapper != null)
{
return ResponsePipeWrapper.CompleteAsync().AsTask();
var completeAsyncValueTask = ResponsePipeWrapper.CompleteAsync();
if (!completeAsyncValueTask.IsCompletedSuccessfully)
{
return CompleteResponseBodyAwaited(completeAsyncValueTask);
}
completeAsyncValueTask.GetAwaiter().GetResult();
}
return Task.CompletedTask;
if (!HasResponseStarted)
{
var initializeTask = InitializeResponse(flushHeaders: false);
if (!initializeTask.IsCompletedSuccessfully)
{
return CompleteInitializeResponseAwaited(initializeTask);
}
}
// Completing the body output will trigger a final flush to IIS.
// We'd rather not bypass the bodyoutput to flush, to guarantee we avoid
// calling flush twice at the same time.
// awaiting the writeBodyTask guarantees the response has finished the final flush.
_bodyOutput.Complete();
return _writeBodyTask;
}
private async Task CompleteResponseBodyAwaited(ValueTask completeAsyncTask)
{
await completeAsyncTask;
if (!HasResponseStarted)
{
await InitializeResponse(flushHeaders: false);
}
_bodyOutput.Complete();
await _writeBodyTask;
}
private async Task CompleteInitializeResponseAwaited(Task initializeTask)
{
await initializeTask;
_bodyOutput.Complete();
await _writeBodyTask;
}
bool IHttpUpgradeFeature.IsUpgradableRequest => true;

View File

@ -175,6 +175,9 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
SetResponseTrailers();
}
// Done with response, say there is no more data after writing trailers.
await AsyncIO.FlushAsync(moreData: false);
break;
}
@ -228,7 +231,7 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
Log.ConnectionDisconnect(_logger, ((IHttpConnectionFeature)this).ConnectionId);
}
_bodyOutput.Dispose();
_bodyOutput.Complete();
if (shouldScheduleCancellation)
{

View File

@ -288,7 +288,7 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
if (!canHaveNonEmptyBody)
{
_bodyOutput.Dispose();
_bodyOutput.Complete();
}
else
{
@ -664,7 +664,6 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
// Post completion after completing the request to resume the state machine
PostCompletion(ConvertRequestCompletionResults(successfulRequest));
// Dispose the context
Dispose();
}

View File

@ -26,31 +26,36 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
public override async Task<bool> ProcessRequestAsync()
{
InitializeContext();
var context = default(TContext);
var success = true;
try
{
context = _application.CreateContext(this);
InitializeContext();
try
{
context = _application.CreateContext(this);
await _application.ProcessRequestAsync(context);
}
catch (BadHttpRequestException ex)
{
SetBadRequestState(ex);
ReportApplicationError(ex);
success = false;
}
catch (Exception ex)
{
ReportApplicationError(ex);
success = false;
}
if (ResponsePipeWrapper != null)
{
await ResponsePipeWrapper.CompleteAsync();
}
await _application.ProcessRequestAsync(context);
}
catch (BadHttpRequestException ex)
{
SetBadRequestState(ex);
ReportApplicationError(ex);
success = false;
}
catch (Exception ex)
{
ReportApplicationError(ex);
success = false;
}
finally
{
await CompleteResponseBodyAsync();
_streams.Stop();
if (!HasResponseStarted && _applicationException == null && _onStarting != null)
@ -66,38 +71,20 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
SetResetCode(2);
}
if (_onCompleted != null)
if (!_requestAborted)
{
await FireOnCompleted();
await ProduceEnd();
}
else if (!HasResponseStarted && _requestRejectedException == null)
{
// If the request was aborted and no response was sent, there's no
// meaningful status code to log.
StatusCode = 0;
success = false;
}
}
if (!_requestAborted)
{
await ProduceEnd();
}
else if (!HasResponseStarted && _requestRejectedException == null)
{
// If the request was aborted and no response was sent, there's no
// meaningful status code to log.
StatusCode = 0;
success = false;
}
try
{
_application.DisposeContext(context, _applicationException);
}
catch (Exception ex)
{
// TODO Log this
_applicationException = _applicationException ?? ex;
success = false;
}
finally
{
// Complete response writer and request reader pipe sides
_bodyOutput.Dispose();
_bodyOutput.Complete();
_bodyInputPipe?.Reader.Complete();
// Allow writes to drain
@ -107,13 +94,34 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
}
// Cancel all remaining IO, there might be reads pending if not entire request body was sent by client
AsyncIO?.Dispose();
AsyncIO?.Complete();
if (_readBodyTask != null)
{
await _readBodyTask;
}
}
catch (Exception ex)
{
success = false;
ReportApplicationError(ex);
}
finally
{
if (_onCompleted != null)
{
await FireOnCompleted();
}
try
{
_application.DisposeContext(context, _applicationException);
}
catch (Exception ex)
{
ReportApplicationError(ex);
}
}
return success;
}
}

View File

@ -89,7 +89,6 @@ namespace Microsoft.AspNetCore.Server.IIS.Core.IO
}
}
public ValueTask FlushAsync(bool moreData)
{
var flush = GetFlushOperation();
@ -137,7 +136,7 @@ namespace Microsoft.AspNetCore.Server.IIS.Core.IO
nextContinuation?.Invoke();
}
public void Dispose()
public void Complete()
{
lock (_contextSync)
{

View File

@ -7,11 +7,12 @@ using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.IIS.Core.IO
{
internal interface IAsyncIOEngine: IDisposable
internal interface IAsyncIOEngine
{
ValueTask<int> ReadAsync(Memory<byte> memory);
ValueTask<int> WriteAsync(ReadOnlySequence<byte> data);
ValueTask FlushAsync(bool moreData);
void NotifyCompletion(int hr, int bytes);
void Complete();
}
}

View File

@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.Server.IIS.Core.IO
}
}
public void Dispose()
public void Complete()
{
lock (_contextLock)
{

View File

@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Server.IIS.Core
return FlushAsync(_pipe.Writer, cancellationToken);
}
public void Dispose()
public void Complete()
{
lock (_contextLock)
{

View File

@ -354,6 +354,79 @@ namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
.Build().RunAsync();
}
[ConditionalFact]
[MinimumOSVersion(OperatingSystems.Windows, "10.0.19529", SkipReason = "Reset support was added in Win10_20H2.")]
public async Task Reset_AfterCompleteAsync_NoReset()
{
var deploymentParameters = GetHttpsDeploymentParameters();
var deploymentResult = await DeployAsync(deploymentParameters);
await new HostBuilder()
.UseHttp2Cat(deploymentResult.ApplicationBaseUri, async h2Connection =>
{
await h2Connection.InitializeConnectionAsync();
h2Connection.Logger.LogInformation("Initialized http2 connection. Starting stream 1.");
await h2Connection.StartStreamAsync(1, GetHeaders("/Reset_AfterCompleteAsync_NoReset"), endStream: true);
await h2Connection.ReceiveHeadersAsync(1, decodedHeaders =>
{
Assert.Equal("200", decodedHeaders[HeaderNames.Status]);
});
var dataFrame = await h2Connection.ReceiveFrameAsync();
Http2Utilities.VerifyDataFrame(dataFrame, 1, endOfStream: false, length: 11);
dataFrame = await h2Connection.ReceiveFrameAsync();
Http2Utilities.VerifyDataFrame(dataFrame, 1, endOfStream: true, length: 0);
h2Connection.Logger.LogInformation("Connection stopped.");
})
.Build().RunAsync();
}
[ConditionalFact]
[MinimumOSVersion(OperatingSystems.Windows, "10.0.19529", SkipReason = "Reset support was added in Win10_20H2.")]
public async Task Reset_CompleteAsyncDuringRequestBody_Resets()
{
var deploymentParameters = GetHttpsDeploymentParameters();
var deploymentResult = await DeployAsync(deploymentParameters);
await new HostBuilder()
.UseHttp2Cat(deploymentResult.ApplicationBaseUri, async h2Connection =>
{
await h2Connection.InitializeConnectionAsync();
h2Connection.Logger.LogInformation("Initialized http2 connection. Starting stream 1.");
await h2Connection.StartStreamAsync(1, Http2Utilities.PostRequestHeaders, endStream: false);
await h2Connection.SendDataAsync(1, new byte[10], endStream: false);
await h2Connection.ReceiveHeadersAsync(1, decodedHeaders =>
{
Assert.Equal("200", decodedHeaders[HeaderNames.Status]);
});
var dataFrame = await h2Connection.ReceiveFrameAsync();
if (Environment.OSVersion.Version >= Win10_Regressed_DataFrame)
{
// TODO: Remove when the regression is fixed.
// https://github.com/dotnet/aspnetcore/issues/23164#issuecomment-652646163
Http2Utilities.VerifyDataFrame(dataFrame, 1, endOfStream: false, length: 0);
dataFrame = await h2Connection.ReceiveFrameAsync();
}
Http2Utilities.VerifyDataFrame(dataFrame, 1, endOfStream: true, length: 0);
var resetFrame = await h2Connection.ReceiveFrameAsync();
Http2Utilities.VerifyResetFrame(resetFrame, expectedStreamId: 1, expectedErrorCode: Http2ErrorCode.NO_ERROR);
h2Connection.Logger.LogInformation("Connection stopped.");
})
.Build().RunAsync();
}
private static List<KeyValuePair<string, string>> GetHeaders(string path)
{
var headers = Headers.ToList();
@ -372,7 +445,6 @@ namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
return headers;
}
private IISDeploymentParameters GetHttpsDeploymentParameters()
{
var port = TestPortHelper.GetNextSSLPort();

View File

@ -30,5 +30,26 @@ namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests.InProcess
{
Assert.Equal(20, (await _fixture.Client.GetByteArrayAsync($"/FlushedPipeAndThenUnflushedPipe")).Length);
}
[ConditionalFact]
[RequiresNewHandler]
public async Task ResponseBodyTest_GettingHttpContextFieldsWork()
{
Assert.Equal("SlowOnCompleted", await _fixture.Client.GetStringAsync($"/OnCompletedHttpContext"));
Assert.Equal("", await _fixture.Client.GetStringAsync($"/OnCompletedHttpContext_Completed"));
}
[ConditionalFact]
[RequiresNewHandler]
public async Task ResponseBodyTest_CompleteAsyncWorks()
{
// The app func for CompleteAsync will not finish until CompleteAsync_Completed is sent.
// This verifies that the response is sent to the client with CompleteAsync
var response = await _fixture.Client.GetAsync("/CompleteAsync");
Assert.True(response.IsSuccessStatusCode);
var response2 = await _fixture.Client.GetAsync("/CompleteAsync_Completed");
Assert.True(response2.IsSuccessStatusCode);
}
}
}

View File

@ -28,8 +28,6 @@ namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
[MinimumOSVersion(OperatingSystems.Windows, WindowsVersionForTrailers)]
public async Task ResponseTrailers_HTTP2_TrailersAvailable()
{
var version = System.Environment.OSVersion.Version;
var deploymentParameters = GetHttpsDeploymentParameters();
var deploymentResult = await DeployAsync(deploymentParameters);
@ -177,6 +175,45 @@ namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
Assert.Equal(new[] { "TrailerValue0", "TrailerValue1" }, response.TrailingHeaders.GetValues("TrailerName"));
}
[ConditionalFact]
[MinimumOSVersion(OperatingSystems.Windows, WindowsVersionForTrailers)]
public async Task ResponseTrailers_CompleteAsyncNoBody_TrailersSent()
{
var deploymentParameters = GetHttpsDeploymentParameters();
var deploymentResult = await DeployAsync(deploymentParameters);
// The app func for CompleteAsync will not finish until CompleteAsync_Completed is sent.
// This verifies that the response is sent to the client with CompleteAsync
var response = await SendRequestAsync(deploymentResult.HttpClient.BaseAddress.ToString() + "ResponseTrailers_CompleteAsyncNoBody_TrailersSent");
response.EnsureSuccessStatusCode();
Assert.Equal(HttpVersion.Version20, response.Version);
Assert.NotEmpty(response.TrailingHeaders);
Assert.Equal("TrailerValue", response.TrailingHeaders.GetValues("TrailerName").Single());
var response2 = await SendRequestAsync(deploymentResult.HttpClient.BaseAddress.ToString() + "ResponseTrailers_CompleteAsyncNoBody_TrailersSent_Completed");
Assert.True(response2.IsSuccessStatusCode);
}
[ConditionalFact]
[MinimumOSVersion(OperatingSystems.Windows, WindowsVersionForTrailers)]
public async Task ResponseTrailers_CompleteAsyncWithBody_TrailersSent()
{
var deploymentParameters = GetHttpsDeploymentParameters();
var deploymentResult = await DeployAsync(deploymentParameters);
// The app func for CompleteAsync will not finish until CompleteAsync_Completed is sent.
// This verifies that the response is sent to the client with CompleteAsync
var response = await SendRequestAsync(deploymentResult.HttpClient.BaseAddress.ToString() + "ResponseTrailers_CompleteAsyncWithBody_TrailersSent");
response.EnsureSuccessStatusCode();
Assert.Equal(HttpVersion.Version20, response.Version);
Assert.Equal("Hello World", await response.Content.ReadAsStringAsync());
Assert.NotEmpty(response.TrailingHeaders);
Assert.Equal("Trailer Value", response.TrailingHeaders.GetValues("TrailerName").Single());
var response2 = await SendRequestAsync(deploymentResult.HttpClient.BaseAddress.ToString() + "ResponseTrailers_CompleteAsyncWithBody_TrailersSent_Completed");
Assert.True(response2.IsSuccessStatusCode);
}
private IISDeploymentParameters GetHttpsDeploymentParameters()
{
var port = TestPortHelper.GetNextSSLPort();

View File

@ -35,19 +35,22 @@ namespace TestSite
public partial class Startup
{
public static bool StartupHookCalled;
private IHttpContextAccessor _httpContextAccessor;
public void Configure(IApplicationBuilder app)
public void Configure(IApplicationBuilder app, IHttpContextAccessor httpContextAccessor)
{
if (Environment.GetEnvironmentVariable("ENABLE_HTTPS_REDIRECTION") != null)
{
app.UseHttpsRedirection();
}
TestStartup.Register(app, this);
_httpContextAccessor = httpContextAccessor;
}
public void ConfigureServices(IServiceCollection serviceCollection)
{
serviceCollection.AddResponseCompression();
serviceCollection.AddHttpContextAccessor();
}
#if FORWARDCOMPAT
private async Task ContentRootPath(HttpContext ctx) => await ctx.Response.WriteAsync(ctx.RequestServices.GetService<Microsoft.AspNetCore.Hosting.IHostingEnvironment>().ContentRootPath);
@ -1315,11 +1318,132 @@ namespace TestSite
}
}
public Task Goaway(HttpContext httpContext)
{
httpContext.Response.Headers["Connection"] = "close";
return Task.CompletedTask;
}
private TaskCompletionSource _completeAsync = new TaskCompletionSource();
public async Task CompleteAsync(HttpContext httpContext)
{
await httpContext.Response.CompleteAsync();
await _completeAsync.Task;
}
public Task CompleteAsync_Completed(HttpContext httpContext)
{
_completeAsync.TrySetResult();
return Task.CompletedTask;
}
public async Task Reset_DuringRequestBody_Resets_Complete(HttpContext httpContext)
{
await _resetDuringRequestBodyResetsCts.Task;
}
private TaskCompletionSource<object> _onCompletedHttpContext = new TaskCompletionSource<object>();
public async Task OnCompletedHttpContext(HttpContext context)
{
// This shouldn't block the response or the server from shutting down.
context.Response.OnCompleted(async () =>
{
var context = _httpContextAccessor.HttpContext;
await Task.Delay(500);
// Access all fields of the connection after final flush.
try
{
_ = context.Connection.RemoteIpAddress;
_ = context.Connection.LocalIpAddress;
_ = context.Connection.Id;
_ = context.Connection.ClientCertificate;
_ = context.Connection.LocalPort;
_ = context.Connection.RemotePort;
_ = context.Request.ContentLength;
_ = context.Request.Headers;
_ = context.Request.Query;
_ = context.Request.Body;
_ = context.Request.ContentType;
_ = context.Response.StatusCode;
_ = context.Response.Body;
_ = context.Response.Headers;
_ = context.Response.ContentType;
}
catch (Exception ex)
{
_onCompletedHttpContext.TrySetResult(ex);
}
_onCompletedHttpContext.TrySetResult(null);
});
await context.Response.WriteAsync("SlowOnCompleted");
}
public async Task OnCompletedHttpContext_Completed(HttpContext httpContext)
{
await _onCompletedHttpContext.Task;
}
private TaskCompletionSource<object> _responseTrailers_CompleteAsyncNoBody_TrailersSent = new TaskCompletionSource<object>();
public async Task ResponseTrailers_CompleteAsyncNoBody_TrailersSent(HttpContext httpContext)
{
httpContext.Response.AppendTrailer("trailername", "TrailerValue");
await httpContext.Response.CompleteAsync();
await _responseTrailers_CompleteAsyncNoBody_TrailersSent.Task;
}
public Task ResponseTrailers_CompleteAsyncNoBody_TrailersSent_Completed(HttpContext httpContext)
{
_responseTrailers_CompleteAsyncNoBody_TrailersSent.TrySetResult(null);
return Task.CompletedTask;
}
private TaskCompletionSource<object> _responseTrailers_CompleteAsyncWithBody_TrailersSent = new TaskCompletionSource<object>();
public async Task ResponseTrailers_CompleteAsyncWithBody_TrailersSent(HttpContext httpContext)
{
await httpContext.Response.WriteAsync("Hello World");
httpContext.Response.AppendTrailer("TrailerName", "Trailer Value");
await httpContext.Response.CompleteAsync();
await _responseTrailers_CompleteAsyncWithBody_TrailersSent.Task;
}
public Task ResponseTrailers_CompleteAsyncWithBody_TrailersSent_Completed(HttpContext httpContext)
{
_responseTrailers_CompleteAsyncWithBody_TrailersSent.TrySetResult(null);
return Task.CompletedTask;
}
public async Task Reset_AfterCompleteAsync_NoReset(HttpContext httpContext)
{
Assert.Equal("HTTP/2", httpContext.Request.Protocol);
var feature = httpContext.Features.Get<IHttpResetFeature>();
Assert.NotNull(feature);
await httpContext.Response.WriteAsync("Hello World");
await httpContext.Response.CompleteAsync();
// The request and response are fully complete, the reset doesn't get sent.
feature.Reset(1111);
}
public async Task Reset_CompleteAsyncDuringRequestBody_Resets(HttpContext httpContext)
{
Assert.Equal("HTTP/2", httpContext.Request.Protocol);
var feature = httpContext.Features.Get<IHttpResetFeature>();
Assert.NotNull(feature);
var read = await httpContext.Request.Body.ReadAsync(new byte[10], 0, 10);
Assert.Equal(10, read);
var readTask = httpContext.Request.Body.ReadAsync(new byte[10], 0, 10);
await httpContext.Response.CompleteAsync();
feature.Reset((int)0); // GRPC does this
await Assert.ThrowsAsync<IOException>(() => readTask);
}
internal static readonly HashSet<(string, StringValues, StringValues)> NullTrailers = new HashSet<(string, StringValues, StringValues)>()
{
("NullString", (string)null, (string)null),