Make first write use the same BufferWriter (#7505)

This commit is contained in:
Justin Kotalik 2019-02-15 08:26:57 -08:00 committed by GitHub
parent 98ad532822
commit 25f1f59378
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 214 additions and 84 deletions

View File

@ -236,27 +236,32 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
if (_advancedBytesForChunk > 0 || buffer.Length > 0)
{
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
if (_advancedBytesForChunk > 0)
{
WriteCurrentMemoryToPipeWriter(ref writer);
}
if (buffer.Length > 0)
{
writer.WriteBeginChunkBytes(buffer.Length);
writer.Write(buffer);
writer.WriteEndChunkBytes();
}
writer.Commit();
_unflushedBytes += writer.BytesCommitted;
CommitChunkInternal(ref writer, buffer);
}
}
return FlushAsync(cancellationToken);
}
private void CommitChunkInternal(ref BufferWriter<PipeWriter> writer, ReadOnlySpan<byte> buffer)
{
if (_advancedBytesForChunk > 0)
{
WriteCurrentMemoryToPipeWriter(ref writer);
}
if (buffer.Length > 0)
{
writer.WriteBeginChunkBytes(buffer.Length);
writer.Write(buffer);
writer.WriteEndChunkBytes();
}
writer.Commit();
_unflushedBytes += writer.BytesCommitted;
}
public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk)
{
lock (_contextLock)
@ -268,20 +273,24 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
var buffer = _pipeWriter;
var writer = new BufferWriter<PipeWriter>(buffer);
writer.Write(HttpVersion11Bytes);
var statusBytes = ReasonPhrases.ToStatusBytes(statusCode, reasonPhrase);
writer.Write(statusBytes);
responseHeaders.CopyTo(ref writer);
writer.Write(EndHeadersBytes);
writer.Commit();
_unflushedBytes += writer.BytesCommitted;
_autoChunk = autoChunk;
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, autoChunk);
}
}
private void WriteResponseHeadersInternal(ref BufferWriter<PipeWriter> writer, int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk)
{
writer.Write(HttpVersion11Bytes);
var statusBytes = ReasonPhrases.ToStatusBytes(statusCode, reasonPhrase);
writer.Write(statusBytes);
responseHeaders.CopyTo(ref writer);
writer.Write(EndHeadersBytes);
writer.Commit();
_unflushedBytes += writer.BytesCommitted;
_autoChunk = autoChunk;
}
public void Dispose()
{
lock (_contextLock)
@ -338,6 +347,44 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return WriteAsync(ContinueBytes);
}
public ValueTask<FlushResult> FirstWriteAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
{
lock (_contextLock)
{
if (_pipeWriterCompleted)
{
return default;
}
// Uses same BufferWriter to write response headers and response
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, autoChunk);
return WriteAsyncInternal(ref writer, buffer, cancellationToken);
}
}
public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
{
lock (_contextLock)
{
if (_pipeWriterCompleted)
{
return default;
}
// Uses same BufferWriter to write response headers and chunk
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, autoChunk);
CommitChunkInternal(ref writer, buffer);
return FlushAsync(cancellationToken);
}
}
private ValueTask<FlushResult> WriteAsync(
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken = default)
@ -350,38 +397,47 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
}
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
if (_autoChunk)
{
if (_advancedBytesForChunk > 0)
{
// If there is data that was chunked before writing (ex someone did GetMemory->Advance->WriteAsync)
// make sure to write whatever was advanced first
WriteCurrentMemoryToPipeWriter(ref writer);
}
else
{
// If there is an empty write, we still need to update the current chunk
_currentChunkMemoryUpdated = false;
}
}
if (buffer.Length > 0)
{
writer.Write(buffer);
}
writer.Commit();
var bytesWritten = _unflushedBytes + writer.BytesCommitted;
_unflushedBytes = 0;
return _flusher.FlushAsync(
_minResponseDataRateFeature.MinDataRate,
bytesWritten,
this,
cancellationToken);
return WriteAsyncInternal(ref writer, buffer, cancellationToken);
}
}
private ValueTask<FlushResult> WriteAsyncInternal(
ref BufferWriter<PipeWriter> writer,
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken = default)
{
if (_autoChunk)
{
if (_advancedBytesForChunk > 0)
{
// If there is data that was chunked before writing (ex someone did GetMemory->Advance->WriteAsync)
// make sure to write whatever was advanced first
WriteCurrentMemoryToPipeWriter(ref writer);
}
else
{
// If there is an empty write, we still need to update the current chunk
_currentChunkMemoryUpdated = false;
}
}
if (buffer.Length > 0)
{
writer.Write(buffer);
}
writer.Commit();
var bytesWritten = _unflushedBytes + writer.BytesCommitted;
_unflushedBytes = 0;
return _flusher.FlushAsync(
_minResponseDataRateFeature.MinDataRate,
bytesWritten,
this,
cancellationToken);
}
// These methods are for chunked http responses that use GetMemory/Advance
private Memory<byte> GetChunkedMemory(int sizeHint)
{

View File

@ -283,7 +283,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
ApplicationAbort();
}
Task IHttpResponseStartFeature.StartAsync(CancellationToken cancellationToken)
{
if (HasResponseStarted)

View File

@ -893,12 +893,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return InitializeResponseAwaited(startingTask, firstWriteByteCount);
}
if (_applicationException != null)
{
ThrowResponseAbortedException();
}
VerifyInitializeState(firstWriteByteCount);
VerifyAndUpdateWrite(firstWriteByteCount);
ProduceStart(appCompleted: false);
return Task.CompletedTask;
@ -909,15 +905,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
await startingTask;
if (_applicationException != null)
{
ThrowResponseAbortedException();
}
VerifyInitializeState(firstWriteByteCount);
VerifyAndUpdateWrite(firstWriteByteCount);
ProduceStart(appCompleted: false);
}
private HttpResponseHeaders InitializeResponseFirstWrite(int firstWriteByteCount)
{
VerifyInitializeState(firstWriteByteCount);
var responseHeaders = CreateResponseHeaders(appCompleted: false);
// InitializeResponse can only be called if we are just about to Flush the headers
_requestProcessingStatus = RequestProcessingStatus.HeadersFlushed;
return responseHeaders;
}
private void ProduceStart(bool appCompleted)
{
if (HasResponseStarted)
@ -927,7 +931,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_requestProcessingStatus = RequestProcessingStatus.HeadersCommitted;
CreateResponseHeader(appCompleted);
var responseHeaders = CreateResponseHeaders(appCompleted);
Output.WriteResponseHeaders(StatusCode, ReasonPhrase, responseHeaders, _autoChunk);
}
private void VerifyInitializeState(int firstWriteByteCount)
{
if (_applicationException != null)
{
ThrowResponseAbortedException();
}
VerifyAndUpdateWrite(firstWriteByteCount);
}
protected Task TryProduceInvalidRequestResponse()
@ -1022,7 +1038,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
}
}
private void CreateResponseHeader(bool appCompleted)
private HttpResponseHeaders CreateResponseHeaders(bool appCompleted)
{
var responseHeaders = HttpResponseHeaders;
var hasConnection = responseHeaders.HasConnection;
@ -1114,7 +1130,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
responseHeaders.SetRawDate(dateHeaderValues.String, dateHeaderValues.Bytes);
}
Output.WriteResponseHeaders(StatusCode, ReasonPhrase, responseHeaders, _autoChunk);
return responseHeaders;
}
private bool CanWriteResponseBody()
@ -1264,7 +1280,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
HandleNonBodyResponseWrite();
// For HEAD requests, we still use the number of bytes written for logging
// how many bytes were written.
// how many bytes were written.
VerifyAndUpdateWrite(bytes);
}
}
@ -1330,18 +1346,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public ValueTask<FlushResult> WritePipeAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
// For the first write, ensure headers are flushed if WriteDataAsync isn't called.
var firstWrite = !HasResponseStarted;
if (firstWrite)
if (!HasResponseStarted)
{
var initializeTask = InitializeResponseAsync(data.Length);
// Just about to Flush the headers
_requestProcessingStatus = RequestProcessingStatus.HeadersFlushed;
// If return is Task.CompletedTask no awaiting is required
if (!ReferenceEquals(initializeTask, Task.CompletedTask))
{
return WriteAsyncAwaited(initializeTask, data, cancellationToken);
}
return FirstWriteAsync(data, cancellationToken);
}
else
{
@ -1354,7 +1361,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
if (data.Length == 0)
{
return !firstWrite ? default : Output.FlushAsync(cancellationToken);
return default;
}
return Output.WriteChunkAsync(data.Span, cancellationToken);
@ -1368,7 +1375,58 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
else
{
HandleNonBodyResponseWrite();
return !firstWrite ? default : Output.FlushAsync(cancellationToken);
return default;
}
}
private ValueTask<FlushResult> FirstWriteAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
Debug.Assert(!HasResponseStarted);
var startingTask = FireOnStarting();
if (!ReferenceEquals(startingTask, Task.CompletedTask))
{
return FirstWriteAsyncAwaited(startingTask, data, cancellationToken);
}
return FirstWriteAsyncInternal(data, cancellationToken);
}
private async ValueTask<FlushResult> FirstWriteAsyncAwaited(Task initializeTask, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
await initializeTask;
return await FirstWriteAsyncInternal(data, cancellationToken);
}
private ValueTask<FlushResult> FirstWriteAsyncInternal(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
var responseHeaders = InitializeResponseFirstWrite(data.Length);
if (_canWriteResponseBody)
{
if (_autoChunk)
{
if (data.Length == 0)
{
Output.WriteResponseHeaders(StatusCode, ReasonPhrase, responseHeaders, _autoChunk);
return Output.FlushAsync(cancellationToken);
}
return Output.FirstWriteChunkedAsync(StatusCode, ReasonPhrase, responseHeaders, _autoChunk, data.Span, cancellationToken);
}
else
{
CheckLastWrite();
return Output.FirstWriteAsync(StatusCode, ReasonPhrase, responseHeaders, _autoChunk, data.Span, cancellationToken);
}
}
else
{
Output.WriteResponseHeaders(StatusCode, ReasonPhrase, responseHeaders, _autoChunk);
HandleNonBodyResponseWrite();
return Output.FlushAsync(cancellationToken);
}
}

View File

@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken);
ValueTask<FlushResult> Write100ContinueAsync();
void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk);
void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk);
// This takes ReadOnlySpan instead of ReadOnlyMemory because it always synchronously copies data before flushing.
ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
@ -23,5 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
Memory<byte> GetMemory(int sizeHint = 0);
void CancelPendingFlush();
void Complete();
ValueTask<FlushResult> FirstWriteAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> data, CancellationToken cancellationToken);
ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> data, CancellationToken cancellationToken);
}
}

View File

@ -253,11 +253,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public ValueTask<FlushResult> FirstWriteAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> data, CancellationToken cancellationToken)
{
lock (_dataWriterLock)
{
WriteResponseHeaders(statusCode, reasonPhrase, responseHeaders, autoChunk);
return WriteDataToPipeAsync(data, cancellationToken);
}
}
ValueTask<FlushResult> IHttpOutputProducer.WriteChunkAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk, ReadOnlySpan<byte> data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public void Complete()
{
// This will noop for now. See: https://github.com/aspnet/AspNetCore/issues/7370