React to pipeline changes (#570)

This commit is contained in:
Pavel Krymets 2018-01-29 14:35:22 -08:00 committed by GitHub
parent 85ec92012a
commit bc1a60704b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 42 deletions

View File

@ -28,18 +28,18 @@
<MicrosoftExtensionsPlatformAbstractionsPackageVersion>1.1.0</MicrosoftExtensionsPlatformAbstractionsPackageVersion> <MicrosoftExtensionsPlatformAbstractionsPackageVersion>1.1.0</MicrosoftExtensionsPlatformAbstractionsPackageVersion>
<MicrosoftExtensionsSecurityHelperSourcesPackageVersion>2.1.0-preview1-28153</MicrosoftExtensionsSecurityHelperSourcesPackageVersion> <MicrosoftExtensionsSecurityHelperSourcesPackageVersion>2.1.0-preview1-28153</MicrosoftExtensionsSecurityHelperSourcesPackageVersion>
<MicrosoftNETCoreApp20PackageVersion>2.0.0</MicrosoftNETCoreApp20PackageVersion> <MicrosoftNETCoreApp20PackageVersion>2.0.0</MicrosoftNETCoreApp20PackageVersion>
<MicrosoftNETCoreApp21PackageVersion>2.1.0-preview1-26115-03</MicrosoftNETCoreApp21PackageVersion> <MicrosoftNETCoreApp21PackageVersion>2.1.0-preview1-26126-02</MicrosoftNETCoreApp21PackageVersion>
<MicrosoftNetHttpHeadersPackageVersion>2.1.0-preview1-28153</MicrosoftNetHttpHeadersPackageVersion> <MicrosoftNetHttpHeadersPackageVersion>2.1.0-preview1-28153</MicrosoftNetHttpHeadersPackageVersion>
<MicrosoftNETTestSdkPackageVersion>15.3.0</MicrosoftNETTestSdkPackageVersion> <MicrosoftNETTestSdkPackageVersion>15.3.0</MicrosoftNETTestSdkPackageVersion>
<MicrosoftWebAdministrationPackageVersion>7.0.0</MicrosoftWebAdministrationPackageVersion> <MicrosoftWebAdministrationPackageVersion>7.0.0</MicrosoftWebAdministrationPackageVersion>
<SystemBuffersPackageVersion>4.5.0-preview1-26112-01</SystemBuffersPackageVersion> <SystemBuffersPackageVersion>4.5.0-preview1-26126-05</SystemBuffersPackageVersion>
<SystemIOPipelinesPackageVersion>0.1.0-e180104-2</SystemIOPipelinesPackageVersion> <SystemIOPipelinesPackageVersion>0.1.0-preview1-180129-1</SystemIOPipelinesPackageVersion>
<SystemManagementAutomationPackageVersion>6.1.7601.17515</SystemManagementAutomationPackageVersion> <SystemManagementAutomationPackageVersion>6.1.7601.17515</SystemManagementAutomationPackageVersion>
<SystemMemoryPackageVersion>4.5.0-preview1-26112-01</SystemMemoryPackageVersion> <SystemMemoryPackageVersion>4.5.0-preview1-26126-05</SystemMemoryPackageVersion>
<SystemNumericsVectorsPackageVersion>4.5.0-preview1-26112-01</SystemNumericsVectorsPackageVersion> <SystemNumericsVectorsPackageVersion>4.5.0-preview1-26126-05</SystemNumericsVectorsPackageVersion>
<SystemRuntimeCompilerServicesUnsafePackageVersion>4.5.0-preview1-26112-01</SystemRuntimeCompilerServicesUnsafePackageVersion> <SystemRuntimeCompilerServicesUnsafePackageVersion>4.5.0-preview1-26126-05</SystemRuntimeCompilerServicesUnsafePackageVersion>
<SystemSecurityPrincipalWindowsPackageVersion>4.5.0-preview1-26112-01</SystemSecurityPrincipalWindowsPackageVersion> <SystemSecurityPrincipalWindowsPackageVersion>4.5.0-preview1-26126-05</SystemSecurityPrincipalWindowsPackageVersion>
<SystemTextEncodingsWebUtf8PackageVersion>0.1.0-e180104-2</SystemTextEncodingsWebUtf8PackageVersion> <SystemTextEncodingsWebUtf8PackageVersion>0.1.0-preview1-180129-1</SystemTextEncodingsWebUtf8PackageVersion>
<XunitPackageVersion>2.3.1</XunitPackageVersion> <XunitPackageVersion>2.3.1</XunitPackageVersion>
<XunitRunnerVisualStudioPackageVersion>2.3.1</XunitRunnerVisualStudioPackageVersion> <XunitRunnerVisualStudioPackageVersion>2.3.1</XunitRunnerVisualStudioPackageVersion>
</PropertyGroup> </PropertyGroup>

View File

@ -141,8 +141,8 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
RequestBody = new IISHttpRequestBody(this); RequestBody = new IISHttpRequestBody(this);
ResponseBody = new IISHttpResponseBody(this); ResponseBody = new IISHttpResponseBody(this);
Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: Scheduler.TaskRun)); Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool));
var pipe = new Pipe(new PipeOptions(_memoryPool, readerScheduler: Scheduler.TaskRun)); var pipe = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool));
Output = new OutputProducer(pipe); Output = new OutputProducer(pipe);
} }
@ -165,7 +165,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
internal WindowsPrincipal WindowsUser { get; set; } internal WindowsPrincipal WindowsUser { get; set; }
public Stream RequestBody { get; set; } public Stream RequestBody { get; set; }
public Stream ResponseBody { get; set; } public Stream ResponseBody { get; set; }
public IPipe Input { get; set; } public Pipe Input { get; set; }
public OutputProducer Output { get; set; } public OutputProducer Output { get; set; }
public IHeaderDictionary RequestHeaders { get; set; } public IHeaderDictionary RequestHeaders { get; set; }
@ -253,7 +253,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
} }
finally finally
{ {
Input.Reader.Advance(readableBuffer.End, readableBuffer.End); Input.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
} }
} }
} }
@ -438,22 +438,22 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
while (true) while (true)
{ {
// These buffers are pinned // These buffers are pinned
var wb = Input.Writer.Alloc(MinAllocBufferSize); var wb = Input.Writer.GetMemory(MinAllocBufferSize);
_inputHandle = wb.Buffer.Retain(true); _inputHandle = wb.Retain(true);
try try
{ {
int read = 0; int read = 0;
if (_wasUpgraded) if (_wasUpgraded)
{ {
read = await ReadWebSocketsAsync(wb.Buffer.Length); read = await ReadWebSocketsAsync(wb.Length);
} }
else else
{ {
_currentOperation = _currentOperation.ContinueWith(async (t) => _currentOperation = _currentOperation.ContinueWith(async (t) =>
{ {
_currentOperationType = CurrentOperationType.Read; _currentOperationType = CurrentOperationType.Read;
read = await ReadAsync(wb.Buffer.Length); read = await ReadAsync(wb.Length);
}).Unwrap(); }).Unwrap();
await _currentOperation; await _currentOperation;
} }
@ -463,15 +463,15 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
break; break;
} }
wb.Advance(read); Input.Writer.Advance(read);
} }
finally finally
{ {
wb.Commit(); Input.Writer.Commit();
_inputHandle.Dispose(); _inputHandle.Dispose();
} }
var result = await wb.FlushAsync(); var result = await Input.Writer.FlushAsync();
if (result.IsCompleted || result.IsCancelled) if (result.IsCompleted || result.IsCancelled)
{ {
@ -555,19 +555,19 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
} }
finally finally
{ {
Output.Reader.Advance(consumed); Output.Reader.AdvanceTo(consumed);
} }
} }
Output.Reader.Complete(); Output.Reader.Complete();
} }
private unsafe IISAwaitable WriteAsync(ReadOnlyBuffer buffer) private unsafe IISAwaitable WriteAsync(ReadOnlyBuffer<byte> buffer)
{ {
var fCompletionExpected = false; var fCompletionExpected = false;
var hr = 0; var hr = 0;
var nChunks = 0; var nChunks = 0;
if (buffer.IsSingleSpan) if (buffer.IsSingleSegment)
{ {
nChunks = 1; nChunks = 1;
} }
@ -579,7 +579,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
} }
} }
if (buffer.IsSingleSpan) if (buffer.IsSingleSegment)
{ {
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1]; var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1];

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System; using System;
using System.Buffers;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -17,7 +18,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
private bool _completed = false; private bool _completed = false;
private readonly IPipe _pipe; private readonly Pipe _pipe;
// https://github.com/dotnet/corefxlab/issues/1334 // https://github.com/dotnet/corefxlab/issues/1334
// Pipelines don't support multiple awaiters on flush // Pipelines don't support multiple awaiters on flush
@ -26,13 +27,13 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
private readonly object _flushLock = new object(); private readonly object _flushLock = new object();
private Action _flushCompleted; private Action _flushCompleted;
public OutputProducer(IPipe pipe) public OutputProducer(Pipe pipe)
{ {
_pipe = pipe; _pipe = pipe;
_flushCompleted = OnFlushCompleted; _flushCompleted = OnFlushCompleted;
} }
public IPipeReader Reader => _pipe.Reader; public PipeReader Reader => _pipe.Reader;
public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{ {
@ -73,8 +74,6 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
ArraySegment<byte> buffer, ArraySegment<byte> buffer,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var writableBuffer = default(WritableBuffer);
lock (_contextLock) lock (_contextLock)
{ {
if (_completed) if (_completed)
@ -82,26 +81,16 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
throw new ObjectDisposedException("Response is already completed"); throw new ObjectDisposedException("Response is already completed");
} }
writableBuffer = _pipe.Writer.Alloc(1); _pipe.Writer.Write(new ReadOnlySpan<byte>(buffer.Array, buffer.Offset, buffer.Count));
// TODO obsolete
#pragma warning disable CS0618 // Type or member is obsolete
var writer = new WritableBufferWriter(writableBuffer);
#pragma warning restore CS0618 // Type or member is obsolete
if (buffer.Count > 0)
{
writer.Write(buffer.Array, buffer.Offset, buffer.Count);
}
writableBuffer.Commit();
} }
return FlushAsync(writableBuffer, cancellationToken); return FlushAsync(_pipe.Writer, cancellationToken);
} }
private Task FlushAsync(WritableBuffer writableBuffer, private Task FlushAsync(PipeWriter pipeWriter,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var awaitable = writableBuffer.FlushAsync(cancellationToken); var awaitable = pipeWriter.FlushAsync(cancellationToken);
if (awaitable.IsCompleted) if (awaitable.IsCompleted)
{ {
// The flush task can't fail today // The flush task can't fail today