Move reading and writing of Pipelines to a single loop; Cancel Read every time we write. (#582)

This commit is contained in:
Justin Kotalik 2018-03-04 19:52:41 -08:00 committed by GitHub
parent cbffeb33ea
commit 4e8a9d2493
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1040 additions and 469 deletions

View File

@ -3,7 +3,7 @@
<MSBuildAllProjects>$(MSBuildAllProjects);$(MSBuildThisFileFullPath)</MSBuildAllProjects>
</PropertyGroup>
<PropertyGroup Label="Package Versions">
<InternalAspNetCoreSdkPackageVersion>2.1.0-preview2-15721</InternalAspNetCoreSdkPackageVersion>
<InternalAspNetCoreSdkPackageVersion>2.1.0-preview2-15726</InternalAspNetCoreSdkPackageVersion>
<MicrosoftAspNetCoreAllPackageVersion>2.1.0-preview2-30220</MicrosoftAspNetCoreAllPackageVersion>
<MicrosoftAspNetCoreAuthenticationCorePackageVersion>2.1.0-preview2-30220</MicrosoftAspNetCoreAuthenticationCorePackageVersion>
<MicrosoftAspNetCoreHostingAbstractionsPackageVersion>2.1.0-preview2-30220</MicrosoftAspNetCoreHostingAbstractionsPackageVersion>

View File

@ -1,2 +1,2 @@
version:2.1.0-preview2-15721
commithash:f9bb4be59e39938ec59a6975257e26099b0d03c1
version:2.1.0-preview2-15726
commithash:599e691c41f502ed9e062b1822ce13b673fc916e

View File

@ -20,4 +20,8 @@
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="$(MicrosoftAspNetCoreHostingPackageVersion)" />
</ItemGroup>
<PropertyGroup>
<AspNetCoreModuleHostingModel>inprocess</AspNetCoreModuleHostingModel>
</PropertyGroup>
</Project>

View File

@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.IIS;
using Microsoft.AspNetCore.Server.IISIntegration;

View File

@ -90,10 +90,10 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
internal unsafe static extern int http_websockets_write_bytes(IntPtr pInProcessHandler, HttpApiTypes.HTTP_DATA_CHUNK* pDataChunks, int nChunks, PFN_WEBSOCKET_ASYNC_COMPLETION pfnCompletionCallback, IntPtr pvCompletionContext, out bool fCompletionExpected);
[DllImport(AspNetCoreModuleDll)]
public unsafe static extern int http_enable_websockets(IntPtr pHttpContext);
public unsafe static extern int http_enable_websockets(IntPtr pInProcessHandler);
[DllImport(AspNetCoreModuleDll)]
public unsafe static extern int http_cancel_io(IntPtr pHttpContext);
public unsafe static extern int http_cancel_io(IntPtr pInProcessHandler);
[DllImport(AspNetCoreModuleDll)]
public unsafe static extern int http_response_set_unknown_header(IntPtr pInProcessHandler, byte* pszHeaderName, byte* pszHeaderValue, ushort usHeaderValueLength, bool fReplace);

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
@ -18,8 +17,8 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
private Action _callback;
private Exception _exception;
private int _cbBytes;
private int _hr;
public static readonly NativeMethods.PFN_WEBSOCKET_ASYNC_COMPLETION ReadCallback = (IntPtr pHttpContext, IntPtr pCompletionInfo, IntPtr pvCompletionContext) =>
{
@ -52,15 +51,30 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
{
var exception = _exception;
var cbBytes = _cbBytes;
var hr = _hr;
// Reset the awaitable state
_exception = null;
_cbBytes = 0;
_callback = null;
_hr = 0;
if (exception != null)
{
throw exception;
// If the exception was an aborted read operation,
// return -1 to notify NativeReadAsync that the write was cancelled.
// E_OPERATIONABORTED == 0x800703e3 == -2147023901
// We also don't throw the exception here as this is expected behavior
// and can negatively impact perf if we catch an exception for each
// cann
if (hr != IISServerConstants.HResultCancelIO)
{
throw exception;
}
else
{
cbBytes = -1;
}
}
return cbBytes;
@ -86,12 +100,20 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
public void Complete(int hr, int cbBytes)
{
_hr = hr;
_exception = Marshal.GetExceptionForHR(hr);
_cbBytes = cbBytes;
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
continuation?.Invoke();
}
public Action GetCompletion(int hr, int cbBytes)
{
_hr = hr;
_exception = Marshal.GetExceptionForHR(hr);
_cbBytes = cbBytes;
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
continuation?.Invoke();
return Interlocked.Exchange(ref _callback, _callbackCompleted);
}
}
}

View File

@ -5,7 +5,6 @@ using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Security.Claims;
using System.Threading;
@ -14,7 +13,6 @@ using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Http.Features.Authentication;
using Microsoft.AspNetCore.WebUtilities;
using Microsoft.Extensions.Primitives;
namespace Microsoft.AspNetCore.Server.IISIntegration
{
@ -291,15 +289,16 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
{
throw new InvalidOperationException("CoreStrings.UpgradeCannotBeCalledMultipleTimes");
}
_wasUpgraded = true;
StatusCode = StatusCodes.Status101SwitchingProtocols;
ReasonPhrase = ReasonPhrases.GetReasonPhrase(StatusCodes.Status101SwitchingProtocols);
await UpgradeAsync();
NativeMethods.http_enable_websockets(_pInProcessHandler);
_wasUpgraded = true;
_readWebSocketsOperation = new IISAwaitable();
_writeWebSocketsOperation = new IISAwaitable();
NativeMethods.http_enable_websockets(_pInProcessHandler);
// Upgrade async will cause the stream processing to go into duplex mode
await UpgradeAsync();
return new DuplexStream(RequestBody, ResponseBody);
}

View File

@ -0,0 +1,454 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.HttpSys.Internal;
namespace Microsoft.AspNetCore.Server.IISIntegration
{
internal partial class IISHttpContext
{
/// <summary>
/// Reads data from the Input pipe to the user.
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// Start a task which will continuously call ReadFromIISAsync and WriteToIISAsync
StartProcessingRequestAndResponseBody();
while (true)
{
var result = await Input.Reader.ReadAsync();
var readableBuffer = result.Buffer;
try
{
if (!readableBuffer.IsEmpty)
{
var actual = Math.Min(readableBuffer.Length, count);
readableBuffer = readableBuffer.Slice(0, actual);
readableBuffer.CopyTo(buffer);
return (int)actual;
}
else if (result.IsCompleted)
{
return 0;
}
}
finally
{
Input.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
}
}
/// <summary>
/// Writes data to the output pipe.
/// </summary>
/// <param name="data"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken = default(CancellationToken))
{
if (!_hasResponseStarted)
{
return WriteAsyncAwaited(data, cancellationToken);
}
lock (_stateSync)
{
DisableReads();
return Output.WriteAsync(data, cancellationToken: cancellationToken);
}
}
/// <summary>
/// Flushes the data in the output pipe
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (!_hasResponseStarted)
{
return FlushAsyncAwaited(cancellationToken);
}
lock (_stateSync)
{
DisableReads();
return Output.FlushAsync(cancellationToken);
}
}
public void StartProcessingRequestAndResponseBody()
{
if (_processBodiesTask == null)
{
lock (_createReadWriteBodySync)
{
if (_processBodiesTask == null)
{
_processBodiesTask = ConsumeAsync();
}
}
}
}
private async Task FlushAsyncAwaited(CancellationToken cancellationToken)
{
await InitializeResponseAwaited();
Task flushTask;
lock (_stateSync)
{
DisableReads();
// Want to guarantee that data has been written to the pipe before releasing the lock.
flushTask = Output.FlushAsync(cancellationToken: cancellationToken);
}
await flushTask;
}
private async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken cancellationToken)
{
// WriteAsyncAwaited is only called for the first write to the body.
// Ensure headers are flushed if Write(Chunked)Async isn't called.
await InitializeResponseAwaited();
Task writeTask;
lock (_stateSync)
{
DisableReads();
// Want to guarantee that data has been written to the pipe before releasing the lock.
writeTask = Output.WriteAsync(data, cancellationToken: cancellationToken);
}
await writeTask;
}
// ConsumeAsync is called when either the first read or first write is done.
// There are two modes for reading and writing to the request/response bodies without upgrade.
// 1. Await all reads and try to read from the Output pipe
// 2. Done reading and await all writes.
// If the request is upgraded, we will start bidirectional streams for the input and output.
private async Task ConsumeAsync()
{
await ReadAndWriteLoopAsync();
// The ReadAndWriteLoop can return due to being upgraded. Check if _wasUpgraded is true to determine
// whether we go to a bidirectional stream or only write.
if (_wasUpgraded)
{
await StartBidirectionalStream();
}
}
private unsafe IISAwaitable ReadFromIISAsync(int length)
{
Action completion = null;
lock (_stateSync)
{
// We don't want to read if there is data available in the output pipe
// Therefore, we mark the current operation as cancelled to allow for the read
// to be requeued.
if (Output.Reader.TryRead(out var result))
{
// If the buffer is empty, it is considered a write of zero.
// we still want to cancel and allow the write to occur.
completion = _operation.GetCompletion(hr: IISServerConstants.HResultCancelIO, cbBytes: 0);
Output.Reader.AdvanceTo(result.Buffer.Start);
}
else
{
var hr = NativeMethods.http_read_request_bytes(
_pInProcessHandler,
(byte*)_inputHandle.Pointer,
length,
out var dwReceivedBytes,
out bool fCompletionExpected);
// if we complete the read synchronously, there is no need to set the reading flag
// as there is no cancelable operation.
if (!fCompletionExpected)
{
completion = _operation.GetCompletion(hr, dwReceivedBytes);
}
else
{
_reading = true;
}
}
}
// Invoke the completion outside of the lock if the reead finished synchronously.
completion?.Invoke();
return _operation;
}
private unsafe IISAwaitable WriteToIISAsync(ReadOnlySequence<byte> buffer)
{
var fCompletionExpected = false;
var hr = 0;
var nChunks = 0;
if (buffer.IsSingleSegment)
{
nChunks = 1;
}
else
{
foreach (var memory in buffer)
{
nChunks++;
}
}
if (buffer.IsSingleSegment)
{
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1];
fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span))
{
ref var chunk = ref pDataChunks[0];
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.pBuffer = (IntPtr)pBuffer;
chunk.fromMemory.BufferLength = (uint)buffer.Length;
hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected);
}
}
else
{
// REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks];
var currentChunk = 0;
// REVIEW: We don't really need this list since the memory is already pinned with the default pool,
// but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation...
var handles = new MemoryHandle[nChunks];
foreach (var b in buffer)
{
ref var handle = ref handles[currentChunk];
ref var chunk = ref pDataChunks[currentChunk];
handle = b.Retain(true);
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.BufferLength = (uint)b.Length;
chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer;
currentChunk++;
}
hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected);
// Free the handles
foreach (var handle in handles)
{
handle.Dispose();
}
}
if (!fCompletionExpected)
{
_operation.Complete(hr, 0);
}
return _operation;
}
private unsafe IISAwaitable FlushToIISAsync()
{
// Calls flush
var hr = 0;
hr = NativeMethods.http_flush_response_bytes(_pInProcessHandler, out var fCompletionExpected);
if (!fCompletionExpected)
{
_operation.Complete(hr, 0);
}
return _operation;
}
/// <summary>
/// Main function for control flow with IIS.
/// Uses two Pipes (Input and Output) between application calls to Read/Write/FlushAsync
/// Control Flow:
/// Try to see if there is data written by the application code (using TryRead)
/// and write it to IIS.
/// Check if the connection has been upgraded and call StartBidirectionalStreams
/// if it has.
/// Await reading from IIS, which will be cancelled if application code calls Write/FlushAsync.
/// </summary>
/// <returns>The Reading and Writing task.</returns>
public async Task ReadAndWriteLoopAsync()
{
try
{
while (true)
{
// First we check if there is anything to write from the Output pipe
// If there is, we call WriteToIISAsync
// Check if Output pipe has anything to write to IIS.
if (Output.Reader.TryRead(out var readResult))
{
var buffer = readResult.Buffer;
try
{
if (!buffer.IsEmpty)
{
// Write to IIS buffers
// Guaranteed to write the entire buffer to IIS
await WriteToIISAsync(buffer);
}
else if (readResult.IsCompleted)
{
break;
}
else
{
// Flush of zero bytes
await FlushToIISAsync();
}
}
finally
{
// Always Advance the data pointer to the end of the buffer.
Output.Reader.AdvanceTo(buffer.End);
}
}
// Check if there was an upgrade. If there is, we will replace the request and response bodies with
// two seperate loops. These will still be using the same Input and Output pipes here.
if (_upgradeTcs?.TrySetResult(null) == true)
{
// _wasUpgraded will be set at this point, exit the loop and we will check if we upgraded or not
// when going to next read/write type.
return;
}
// Now we handle the read.
var memory = Input.Writer.GetMemory();
_inputHandle = memory.Retain(true);
try
{
// Lock around invoking ReadFromIISAsync as we don't want to call CancelIo
// when calling read
var read = await ReadFromIISAsync(memory.Length);
// read value of 0 == done reading
// read value of -1 == read cancelled, still allowed to read but we
// need a write to occur first.
if (read == 0)
{
break;
}
else if (read == -1)
{
continue;
}
Input.Writer.Advance(read);
}
finally
{
// Always commit any changes to the Input pipe
_inputHandle.Dispose();
}
// Flush the read data for the Input Pipe writer
var flushResult = await Input.Writer.FlushAsync();
// If the pipe was closed, we are done reading,
if (flushResult.IsCompleted || flushResult.IsCanceled)
{
break;
}
}
// Complete the input writer as we are done reading the request body.
Input.Writer.Complete();
}
catch (Exception ex)
{
Input.Writer.Complete(ex);
}
await WriteLoopAsync();
}
/// <summary>
/// Secondary function for control flow with IIS. This is only called once we are done
/// reading the request body. We now await reading from the Output pipe.
/// </summary>
/// <returns></returns>
private async Task WriteLoopAsync()
{
try
{
while (true)
{
// Reading is done, so we will await all reads from the output pipe
var readResult = await Output.Reader.ReadAsync();
// Get data from pipe
var buffer = readResult.Buffer;
try
{
if (!buffer.IsEmpty)
{
// Write to IIS buffers
// Guaranteed to write the entire buffer to IIS
await WriteToIISAsync(buffer);
}
else if (readResult.IsCompleted)
{
break;
}
else
{
// Flush of zero bytes will
await FlushToIISAsync();
}
}
finally
{
// Always Advance the data pointer to the end of the buffer.
Output.Reader.AdvanceTo(buffer.End);
}
}
// Close the output pipe as we are done reading from it.
Output.Reader.Complete();
}
catch (Exception ex)
{
Output.Reader.Complete(ex);
}
}
// Always called from within a lock
private void DisableReads()
{
// To avoid concurrent reading and writing, if we have a pending read,
// we must cancel it.
// _reading will always be false if we upgrade to websockets, so we don't need to check wasUpgrade
// Also, we set _reading to false after cancelling to detect redundant calls
if (_reading)
{
_reading = false;
// Calls IHttpContext->CancelIo(), which will cause the OnAsyncCompletion handler to fire.
NativeMethods.http_cancel_io(_pInProcessHandler);
}
}
}
}

View File

@ -0,0 +1,225 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Microsoft.AspNetCore.HttpSys.Internal;
namespace Microsoft.AspNetCore.Server.IISIntegration
{
/// <summary>
/// Represents the websocket portion of the <see cref="IISHttpContext"/>
/// </summary>
internal partial class IISHttpContext
{
private bool _wasUpgraded; // Used for detecting repeated upgrades in IISHttpContext
private IISAwaitable _readWebSocketsOperation;
private IISAwaitable _writeWebSocketsOperation;
private TaskCompletionSource<object> _upgradeTcs;
private Task StartBidirectionalStream()
{
// IIS allows for websocket support and duplex channels only on Win8 and above
// This allows us to have two tasks for reading the request and writing the response
var readWebsocketTask = ReadWebSockets();
var writeWebsocketTask = WriteWebSockets();
return Task.WhenAll(readWebsocketTask, writeWebsocketTask);
}
public async Task UpgradeAsync()
{
if (_upgradeTcs == null)
{
_upgradeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
// Flush any contents of the OutputPipe before upgrading to websockets.
await FlushAsync();
await _upgradeTcs.Task;
}
}
private unsafe IISAwaitable ReadWebSocketsFromIISAsync(int length)
{
var hr = 0;
int dwReceivedBytes;
bool fCompletionExpected;
// For websocket calls, we can directly provide a callback function to be called once the websocket operation completes.
hr = NativeMethods.http_websockets_read_bytes(
_pInProcessHandler,
(byte*)_inputHandle.Pointer,
length,
IISAwaitable.ReadCallback,
(IntPtr)_thisHandle,
out dwReceivedBytes,
out fCompletionExpected);
if (!fCompletionExpected)
{
CompleteReadWebSockets(hr, dwReceivedBytes);
}
return _readWebSocketsOperation;
}
private unsafe IISAwaitable WriteWebSocketsFromIISAsync(ReadOnlySequence<byte> buffer)
{
var fCompletionExpected = false;
var hr = 0;
var nChunks = 0;
if (buffer.IsSingleSegment)
{
nChunks = 1;
}
else
{
foreach (var memory in buffer)
{
nChunks++;
}
}
if (buffer.IsSingleSegment)
{
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1];
fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span))
{
ref var chunk = ref pDataChunks[0];
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.pBuffer = (IntPtr)pBuffer;
chunk.fromMemory.BufferLength = (uint)buffer.Length;
hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected);
}
}
else
{
// REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks];
var currentChunk = 0;
// REVIEW: We don't really need this list since the memory is already pinned with the default pool,
// but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation...
var handles = new MemoryHandle[nChunks];
foreach (var b in buffer)
{
ref var handle = ref handles[currentChunk];
ref var chunk = ref pDataChunks[currentChunk];
handle = b.Retain(true);
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.BufferLength = (uint)b.Length;
chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer;
currentChunk++;
}
hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected);
foreach (var handle in handles)
{
handle.Dispose();
}
}
if (!fCompletionExpected)
{
CompleteWriteWebSockets(hr, 0);
}
return _writeWebSocketsOperation;
}
internal void CompleteWriteWebSockets(int hr, int cbBytes)
{
_writeWebSocketsOperation.Complete(hr, cbBytes);
}
internal void CompleteReadWebSockets(int hr, int cbBytes)
{
_readWebSocketsOperation.Complete(hr, cbBytes);
}
private async Task ReadWebSockets()
{
try
{
while (true)
{
var memory = Input.Writer.GetMemory();
_inputHandle = memory.Retain(true);
try
{
int read = 0;
read = await ReadWebSocketsFromIISAsync(memory.Length);
if (read == 0)
{
break;
}
Input.Writer.Advance(read);
}
finally
{
_inputHandle.Dispose();
}
var result = await Input.Writer.FlushAsync();
if (result.IsCompleted || result.IsCanceled)
{
break;
}
}
Input.Writer.Complete();
}
catch (Exception ex)
{
Input.Writer.Complete(ex);
}
}
private async Task WriteWebSockets()
{
try
{
while (true)
{
var result = await Output.Reader.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;
try
{
if (!buffer.IsEmpty)
{
await WriteWebSocketsFromIISAsync(buffer);
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
Output.Reader.AdvanceTo(consumed);
}
}
Output.Reader.Complete();
}
catch (Exception ex)
{
Output.Reader.Complete(ex);
}
}
}
}

View File

@ -25,16 +25,22 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
internal abstract partial class IISHttpContext : NativeRequestContext, IDisposable
{
private const int MinAllocBufferSize = 2048;
private const int PauseWriterThreshold = 65536;
private const int ResumeWriterTheshold = PauseWriterThreshold / 2;
private static bool UpgradeAvailable = (Environment.OSVersion.Version >= new Version(6, 2));
protected readonly IntPtr _pInProcessHandler;
private bool _wasUpgraded;
private bool _reading; // To know whether we are currently in a read operation.
private volatile bool _hasResponseStarted;
private int _statusCode;
private string _reasonPhrase;
private readonly object _onStartingSync = new object();
private readonly object _onCompletedSync = new object();
private readonly object _stateSync = new object();
protected readonly object _createReadWriteBodySync = new object();
protected Stack<KeyValuePair<Func<object, Task>, object>> _onStarting;
protected Stack<KeyValuePair<Func<object, Task>, object>> _onCompleted;
@ -45,20 +51,10 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
private GCHandle _thisHandle;
private MemoryHandle _inputHandle;
private IISAwaitable _operation = new IISAwaitable();
private IISAwaitable _readWebSocketsOperation;
private IISAwaitable _writeWebSocketsOperation;
private TaskCompletionSource<object> _upgradeTcs;
protected Task _readingTask;
protected Task _writingTask;
protected Task _processBodiesTask;
protected int _requestAborted;
private CurrentOperationType _currentOperationType;
private Task _currentOperation = Task.CompletedTask;
private const string NtlmString = "NTLM";
private const string NegotiateString = "Negotiate";
private const string BasicString = "Basic";
@ -141,8 +137,13 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
RequestBody = new IISHttpRequestBody(this);
ResponseBody = new IISHttpResponseBody(this);
Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool));
var pipe = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool));
Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool, minimumSegmentSize: MinAllocBufferSize));
var pipe = new Pipe(new PipeOptions(
_memoryPool,
readerScheduler: PipeScheduler.ThreadPool,
pauseWriterThreshold: PauseWriterThreshold,
resumeWriterThreshold: ResumeWriterTheshold,
minimumSegmentSize: MinAllocBufferSize));
Output = new OutputProducer(pipe);
}
@ -154,7 +155,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
public string QueryString { get; set; }
public string RawTarget { get; set; }
public CancellationToken RequestAborted { get; set; }
public bool HasResponseStarted { get; set; }
public bool HasResponseStarted => _hasResponseStarted;
public IPAddress RemoteIpAddress { get; set; }
public int RemotePort { get; set; }
public IPAddress LocalIpAddress { get; set; }
@ -178,7 +179,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
get { return _statusCode; }
set
{
if (HasResponseStarted)
if (_hasResponseStarted)
{
ThrowResponseAlreadyStartedException(nameof(StatusCode));
}
@ -191,7 +192,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
get { return _reasonPhrase; }
set
{
if (HasResponseStarted)
if (_hasResponseStarted)
{
ThrowResponseAlreadyStartedException(nameof(ReasonPhrase));
}
@ -199,108 +200,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
}
}
private IISAwaitable DoFlushAsync()
{
unsafe
{
var hr = 0;
hr = NativeMethods.http_flush_response_bytes(_pInProcessHandler, out var fCompletionExpected);
if (!fCompletionExpected)
{
_operation.Complete(hr, 0);
}
return _operation;
}
}
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await InitializeResponse(0);
await Output.FlushAsync(cancellationToken);
}
public async Task UpgradeAsync()
{
if (_upgradeTcs == null)
{
_upgradeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await FlushAsync();
await _upgradeTcs.Task;
}
}
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
StartReadingRequestBody();
while (true)
{
var result = await Input.Reader.ReadAsync();
var readableBuffer = result.Buffer;
try
{
if (!readableBuffer.IsEmpty)
{
var actual = Math.Min(readableBuffer.Length, count);
readableBuffer = readableBuffer.Slice(0, actual);
readableBuffer.CopyTo(buffer);
return (int)actual;
}
else if (result.IsCompleted)
{
return 0;
}
}
finally
{
Input.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
}
}
public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken = default(CancellationToken))
{
if (!HasResponseStarted)
{
return WriteAsyncAwaited(data, cancellationToken);
}
// VerifyAndUpdateWrite(data.Count);
return Output.WriteAsync(data, cancellationToken: cancellationToken);
}
public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken cancellationToken)
{
await InitializeResponseAwaited(data.Count);
// WriteAsyncAwaited is only called for the first write to the body.
// Ensure headers are flushed if Write(Chunked)Async isn't called.
await Output.WriteAsync(data, cancellationToken: cancellationToken);
}
public Task InitializeResponse(int firstWriteByteCount)
{
if (HasResponseStarted)
{
return Task.CompletedTask;
}
if (_onStarting != null)
{
return InitializeResponseAwaited(firstWriteByteCount);
}
if (_applicationException != null)
{
ThrowResponseAbortedException();
}
ProduceStart(appCompleted: false);
return Task.CompletedTask;
}
private async Task InitializeResponseAwaited(int firstWriteByteCount)
private async Task InitializeResponseAwaited()
{
await FireOnStarting();
@ -309,7 +209,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
ThrowResponseAbortedException();
}
ProduceStart(appCompleted: false);
await ProduceStart(appCompleted: false);
}
private void ThrowResponseAbortedException()
@ -317,25 +217,38 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
throw new ObjectDisposedException("Unhandled application exception", _applicationException);
}
private void ProduceStart(bool appCompleted)
private async Task ProduceStart(bool appCompleted)
{
if (HasResponseStarted)
if (_hasResponseStarted)
{
return;
}
HasResponseStarted = true;
_hasResponseStarted = true;
SendResponseHeaders(appCompleted);
StartWritingResponseBody();
// On first flush for websockets, we need to flush the headers such that
// IIS will know that an upgrade occured.
// If we don't have anything on the Output pipe, the TryRead in ReadAndWriteLoopAsync
// will fail and we will signal the upgradeTcs that we are upgrading. However, we still
// didn't flush. To fix this, we flush 0 bytes right after writing the headers.
Task flushTask;
lock (_stateSync)
{
DisableReads();
flushTask = Output.FlushAsync();
}
await flushTask;
StartProcessingRequestAndResponseBody();
}
protected Task ProduceEnd()
{
if (_applicationException != null)
{
if (HasResponseStarted)
if (_hasResponseStarted)
{
// We can no longer change the response, so we simply close the connection.
return Task.CompletedTask;
@ -350,7 +263,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
}
}
if (!HasResponseStarted)
if (!_hasResponseStarted)
{
return ProduceEndAwaited();
}
@ -367,10 +280,24 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
private async Task ProduceEndAwaited()
{
ProduceStart(appCompleted: true);
if (_hasResponseStarted)
{
return;
}
// Force flush
await Output.FlushAsync();
_hasResponseStarted = true;
SendResponseHeaders(appCompleted: true);
StartProcessingRequestAndResponseBody();
Task flushAsync;
lock (_stateSync)
{
DisableReads();
flushAsync = Output.FlushAsync();
}
await flushAsync;
}
public unsafe void SendResponseHeaders(bool appCompleted)
@ -423,280 +350,13 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
// TODO
}
public void StartReadingRequestBody()
{
if (_readingTask == null)
{
_readingTask = ProcessRequestBody();
}
}
private async Task ProcessRequestBody()
{
try
{
while (true)
{
// These buffers are pinned
var wb = Input.Writer.GetMemory(MinAllocBufferSize);
_inputHandle = wb.Retain(true);
try
{
int read = 0;
if (_wasUpgraded)
{
read = await ReadWebSocketsAsync(wb.Length);
}
else
{
_currentOperation = _currentOperation.ContinueWith(async (t) =>
{
_currentOperationType = CurrentOperationType.Read;
read = await ReadAsync(wb.Length);
}).Unwrap();
await _currentOperation;
}
if (read == 0)
{
break;
}
Input.Writer.Advance(read);
}
finally
{
_inputHandle.Dispose();
}
var result = await Input.Writer.FlushAsync();
if (result.IsCompleted || result.IsCanceled)
{
break;
}
}
Input.Writer.Complete();
}
catch (Exception ex)
{
Input.Writer.Complete(ex);
}
}
public void StartWritingResponseBody()
{
if (_writingTask == null)
{
_writingTask = ProcessResponseBody();
}
}
private async Task ProcessResponseBody()
{
while (true)
{
ReadResult result;
try
{
result = await Output.Reader.ReadAsync();
}
catch
{
Output.Reader.Complete();
return;
}
var buffer = result.Buffer;
var consumed = buffer.End;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
if (_wasUpgraded)
{
await WriteAsync(buffer);
}
else
{
_currentOperation = _currentOperation.ContinueWith(async (t) =>
{
_currentOperationType = CurrentOperationType.Write;
await WriteAsync(buffer);
}).Unwrap();
await _currentOperation;
}
}
else if (result.IsCompleted)
{
break;
}
else
{
_currentOperation = _currentOperation.ContinueWith(async (t) =>
{
_currentOperationType = CurrentOperationType.Flush;
await DoFlushAsync();
}).Unwrap();
await _currentOperation;
}
_upgradeTcs?.TrySetResult(null);
}
finally
{
Output.Reader.AdvanceTo(consumed);
}
}
Output.Reader.Complete();
}
private unsafe IISAwaitable WriteAsync(ReadOnlySequence<byte> buffer)
{
var fCompletionExpected = false;
var hr = 0;
var nChunks = 0;
if (buffer.IsSingleSegment)
{
nChunks = 1;
}
else
{
foreach (var memory in buffer)
{
nChunks++;
}
}
if (buffer.IsSingleSegment)
{
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1];
fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span))
{
ref var chunk = ref pDataChunks[0];
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.pBuffer = (IntPtr)pBuffer;
chunk.fromMemory.BufferLength = (uint)buffer.Length;
if (_wasUpgraded)
{
hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected);
}
else
{
hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected);
}
}
}
else
{
// REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life
var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks];
var currentChunk = 0;
// REVIEW: We don't really need this list since the memory is already pinned with the default pool,
// but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation...
var handles = new MemoryHandle[nChunks];
foreach (var b in buffer)
{
ref var handle = ref handles[currentChunk];
ref var chunk = ref pDataChunks[currentChunk];
handle = b.Retain(true);
chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
chunk.fromMemory.BufferLength = (uint)b.Length;
chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer;
currentChunk++;
}
if (_wasUpgraded)
{
hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected);
}
else
{
hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected);
}
// Free the handles
foreach (var handle in handles)
{
handle.Dispose();
}
}
if (_wasUpgraded)
{
if (!fCompletionExpected)
{
CompleteWriteWebSockets(hr, 0);
}
return _writeWebSocketsOperation;
}
else
{
if (!fCompletionExpected)
{
_operation.Complete(hr, 0);
}
return _operation;
}
}
private unsafe IISAwaitable ReadAsync(int length)
{
var hr = NativeMethods.http_read_request_bytes(
_pInProcessHandler,
(byte*)_inputHandle.Pointer,
length,
out var dwReceivedBytes,
out bool fCompletionExpected);
if (!fCompletionExpected)
{
_operation.Complete(hr, dwReceivedBytes);
}
return _operation;
}
private unsafe IISAwaitable ReadWebSocketsAsync(int length)
{
var hr = 0;
int dwReceivedBytes;
bool fCompletionExpected;
hr = NativeMethods.http_websockets_read_bytes(
_pInProcessHandler,
(byte*)_inputHandle.Pointer,
length,
IISAwaitable.ReadCallback,
(IntPtr)_thisHandle,
out dwReceivedBytes,
out fCompletionExpected);
if (!fCompletionExpected)
{
CompleteReadWebSockets(hr, dwReceivedBytes);
}
return _readWebSocketsOperation;
}
public abstract Task<bool> ProcessRequestAsync();
public void OnStarting(Func<object, Task> callback, object state)
{
lock (_onStartingSync)
{
if (HasResponseStarted)
if (_hasResponseStarted)
{
throw new InvalidOperationException("Response already started");
}
@ -809,26 +469,16 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
internal void OnAsyncCompletion(int hr, int cbBytes)
{
switch (_currentOperationType)
// Must acquire the _stateSync here as anytime we call complete, we need to hold the lock
// to avoid races with cancellation.
Action continuation;
lock (_stateSync)
{
case CurrentOperationType.Read:
case CurrentOperationType.Write:
_operation.Complete(hr, cbBytes);
break;
case CurrentOperationType.Flush:
_operation.Complete(hr, cbBytes);
break;
_reading = false;
continuation = _operation.GetCompletion(hr, cbBytes);
}
}
internal void CompleteWriteWebSockets(int hr, int cbBytes)
{
_writeWebSocketsOperation.Complete(hr, cbBytes);
}
internal void CompleteReadWebSockets(int hr, int cbBytes)
{
_readWebSocketsOperation.Complete(hr, cbBytes);
continuation?.Invoke();
}
private bool disposedValue = false; // To detect redundant calls
@ -854,7 +504,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
public override void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
Dispose(disposing: true);
}
private void ThrowResponseAlreadyStartedException(string value)
@ -862,14 +512,6 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
throw new InvalidOperationException("Response already started");
}
private enum CurrentOperationType
{
None,
Read,
Write,
Flush
}
private WindowsPrincipal GetWindowsPrincipal()
{
var hr = NativeMethods.http_get_authentication_information(_pInProcessHandler, out var authenticationType, out var token);

View File

@ -3,8 +3,8 @@
using System;
using System.Buffers;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting.Server;
@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
try
{
context = _application.CreateContext(this);
await _application.ProcessRequestAsync(context);
// TODO Verification of Response
//if (Volatile.Read(ref _requestAborted) == 0)
@ -82,18 +81,15 @@ namespace Microsoft.AspNetCore.Server.IISIntegration
// The app is finished and there should be nobody writing to the response pipe
Output.Dispose();
if (_writingTask != null)
{
await _writingTask;
}
// The app is finished and there should be nobody reading from the request pipe
Input.Reader.Complete();
if (_readingTask != null)
Task processBodiesTask;
lock (_createReadWriteBodySync)
{
await _readingTask;
processBodiesTask = _processBodiesTask;
}
await processBodiesTask;
}
return success;
}

View File

@ -2,10 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

View File

@ -3,7 +3,6 @@
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Security.Principal;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Http;

View File

@ -0,0 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.Server.IISIntegration
{
internal static class IISServerConstants
{
internal const int HResultCancelIO = -2147023901;
}
}

View File

@ -59,25 +59,15 @@ IN_PROCESS_HANDLER::OnAsyncCompletion(
HRESULT hrCompletionStatus
)
{
HRESULT hr;
if (FAILED(hrCompletionStatus))
IN_PROCESS_APPLICATION* application = (IN_PROCESS_APPLICATION*)m_pApplication;
if (application == NULL)
{
return RQ_NOTIFICATION_FINISH_REQUEST;
}
else
{
// For now we are assuming we are in our own self contained box.
// TODO refactor Finished and Failure sections to handle in process and out of process failure.
// TODO verify that websocket's OnAsyncCompletion is not calling this.
IN_PROCESS_APPLICATION* application = (IN_PROCESS_APPLICATION*)m_pApplication;
if (application == NULL)
{
hr = E_FAIL;
return RQ_NOTIFICATION_FINISH_REQUEST;
}
return application->OnAsyncCompletion(cbCompletion, hrCompletionStatus, this);
}
// OnAsyncCompletion must call into the application if there was a error. We will redo calls
// to Read/Write if we called cancelIo on the IHttpContext.
return application->OnAsyncCompletion(cbCompletion, hrCompletionStatus, this);
}
VOID

View File

@ -0,0 +1,123 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Testing.xunit;
using Xunit;
namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
{
[Collection(IISTestSiteCollection.Name)]
public class SynchronousReadAndWriteTests
{
private readonly IISTestSiteFixture _fixture;
public SynchronousReadAndWriteTests(IISTestSiteFixture fixture)
{
_fixture = fixture;
}
[ConditionalFact]
public async Task ReadAndWriteSynchronously()
{
for (int i = 0; i < 100; i++)
{
var content = new StringContent(new string('a', 100000));
var response = await _fixture.Client.PostAsync("ReadAndWriteSynchronously", content);
var responseText = await response.Content.ReadAsStringAsync();
Assert.Equal(expected: 110000, actual: responseText.Length);
}
}
[ConditionalFact]
public async Task ReadAndWriteEcho()
{
var body = new string('a', 100000);
var content = new StringContent(body);
var response = await _fixture.Client.PostAsync("ReadAndWriteEcho", content);
var responseText = await response.Content.ReadAsStringAsync();
Assert.Equal(body, responseText);
}
[ConditionalFact]
public async Task ReadAndWriteCopyToAsync()
{
var body = new string('a', 100000);
var content = new StringContent(body);
var response = await _fixture.Client.PostAsync("ReadAndWriteCopyToAsync", content);
var responseText = await response.Content.ReadAsStringAsync();
Assert.Equal(body, responseText);
}
[ConditionalFact]
public async Task ReadAndWriteEchoTwice()
{
var requestBody = new string('a', 10000);
var content = new StringContent(requestBody);
var response = await _fixture.Client.PostAsync("ReadAndWriteEchoTwice", content);
var responseText = await response.Content.ReadAsStringAsync();
Assert.Equal(requestBody.Length * 2, responseText.Length);
}
[ConditionalFact]
public void ReadAndWriteSlowConnection()
{
var ipHostEntry = Dns.GetHostEntry(_fixture.Client.BaseAddress.Host);
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
foreach (var hostEntry in ipHostEntry.AddressList)
{
try
{
socket.Connect(hostEntry, _fixture.Client.BaseAddress.Port);
break;
}
catch (Exception)
{
// Exceptions can be thrown based on ipv6 support
}
}
var testString = "hello world";
var request = $"POST /ReadAndWriteSlowConnection HTTP/1.0\r\n" +
$"Content-Length: {testString.Length}\r\n" +
"Host: " + "localhost\r\n" +
"\r\n";
var bytes = 0;
var requestStringBytes = Encoding.ASCII.GetBytes(request);
var testStringBytes = Encoding.ASCII.GetBytes(testString);
while ((bytes += socket.Send(requestStringBytes, bytes, 1, SocketFlags.None)) < requestStringBytes.Length)
{
}
bytes = 0;
while ((bytes += socket.Send(testStringBytes, bytes, 1, SocketFlags.None)) < testStringBytes.Length)
{
Thread.Sleep(100);
}
var stringBuilder = new StringBuilder();
var buffer = new byte[4096];
int size;
while ((size = socket.Receive(buffer, buffer.Length, SocketFlags.None)) != 0)
{
stringBuilder.Append(Encoding.ASCII.GetString(buffer, 0, size));
}
Assert.Contains(new StringBuilder().Insert(0, "hello world", 100).ToString(), stringBuilder.ToString());
}
}
}
}

View File

@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests
{
public class WebsocketTests
{
}
}

View File

@ -4,6 +4,8 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
@ -35,6 +37,12 @@ namespace IISTestSite
app.Map("/CheckEnvironmentLongValueVariable", CheckEnvironmentLongValueVariable);
app.Map("/CheckAppendedEnvironmentVariable", CheckAppendedEnvironmentVariable);
app.Map("/CheckRemoveAuthEnvironmentVariable", CheckRemoveAuthEnvironmentVariable);
app.Map("/ReadAndWriteSynchronously", ReadAndWriteSynchronously);
app.Map("/ReadAndWriteEcho", ReadAndWriteEcho);
app.Map("/ReadAndWriteCopyToAsync", ReadAndWriteCopyToAsync);
app.Map("/ReadAndWriteEchoTwice", ReadAndWriteEchoTwice);
app.Map("/ReadAndWriteSlowConnection", ReadAndWriteSlowConnection);
app.Map("/WebsocketRequest", WebsocketRequest);
}
private void ServerVariable(IApplicationBuilder app)
@ -325,5 +333,97 @@ namespace IISTestSite
await context.Response.WriteAsync(variable);
});
}
private void ReadAndWriteSynchronously(IApplicationBuilder app)
{
app.Run(async context =>
{
var t2 = Task.Run(() => WriteManyTimesToResponseBody(context));
var t1 = Task.Run(() => ReadRequestBody(context));
await Task.WhenAll(t1, t2);
});
}
private async Task ReadRequestBody(HttpContext context)
{
var readBuffer = new byte[1];
var result = await context.Request.Body.ReadAsync(readBuffer, 0, 1);
while (result != 0)
{
result = await context.Request.Body.ReadAsync(readBuffer, 0, 1);
}
}
private async Task WriteManyTimesToResponseBody(HttpContext context)
{
for (var i = 0; i < 10000; i++)
{
await context.Response.WriteAsync("hello world");
}
}
private void ReadAndWriteEcho(IApplicationBuilder app)
{
app.Run(async context =>
{
var readBuffer = new byte[4096];
var result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length);
while (result != 0)
{
await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result));
result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length);
}
});
}
private void ReadAndWriteEchoTwice(IApplicationBuilder app)
{
app.Run(async context =>
{
var readBuffer = new byte[4096];
var result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length);
while (result != 0)
{
await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result));
await context.Response.Body.FlushAsync();
await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result));
await context.Response.Body.FlushAsync();
result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length);
}
});
}
private void ReadAndWriteSlowConnection(IApplicationBuilder app)
{
app.Run(async context =>
{
var t2 = Task.Run(() => WriteResponseBodyAFewTimes(context));
var t1 = Task.Run(() => ReadRequestBody(context));
await Task.WhenAll(t1, t2);
});
}
private async Task WriteResponseBodyAFewTimes(HttpContext context)
{
for (var i = 0; i < 100; i++)
{
await context.Response.WriteAsync("hello world");
}
}
private void WebsocketRequest(IApplicationBuilder app)
{
app.Run(async context =>
{
await context.Response.WriteAsync("test");
});
}
private void ReadAndWriteCopyToAsync(IApplicationBuilder app)
{
app.Run(async context =>
{
await context.Request.Body.CopyToAsync(context.Response.Body);
});
}
}
}