diff --git a/build/dependencies.props b/build/dependencies.props index cdd013ce5d..8691d498a5 100644 --- a/build/dependencies.props +++ b/build/dependencies.props @@ -3,7 +3,7 @@ $(MSBuildAllProjects);$(MSBuildThisFileFullPath) - 2.1.0-preview2-15721 + 2.1.0-preview2-15726 2.1.0-preview2-30220 2.1.0-preview2-30220 2.1.0-preview2-30220 diff --git a/korebuild-lock.txt b/korebuild-lock.txt index e6c7fddffa..bdaa7048b3 100644 --- a/korebuild-lock.txt +++ b/korebuild-lock.txt @@ -1,2 +1,2 @@ -version:2.1.0-preview2-15721 -commithash:f9bb4be59e39938ec59a6975257e26099b0d03c1 +version:2.1.0-preview2-15726 +commithash:599e691c41f502ed9e062b1822ce13b673fc916e diff --git a/samples/NativeIISSample/NativeIISSample.csproj b/samples/NativeIISSample/NativeIISSample.csproj index d9c2419b8f..ac2d826cff 100644 --- a/samples/NativeIISSample/NativeIISSample.csproj +++ b/samples/NativeIISSample/NativeIISSample.csproj @@ -20,4 +20,8 @@ + + inprocess + + diff --git a/samples/NativeIISSample/Startup.cs b/samples/NativeIISSample/Startup.cs index 3fac22e9da..4010839be8 100644 --- a/samples/NativeIISSample/Startup.cs +++ b/samples/NativeIISSample/Startup.cs @@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Authentication; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Server.IIS; using Microsoft.AspNetCore.Server.IISIntegration; diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/NativeMethods.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/NativeMethods.cs index 7a211d83d8..e922dbd909 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/NativeMethods.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/NativeMethods.cs @@ -90,10 +90,10 @@ namespace Microsoft.AspNetCore.Server.IISIntegration internal unsafe static extern int http_websockets_write_bytes(IntPtr pInProcessHandler, HttpApiTypes.HTTP_DATA_CHUNK* pDataChunks, int nChunks, PFN_WEBSOCKET_ASYNC_COMPLETION pfnCompletionCallback, IntPtr pvCompletionContext, out bool fCompletionExpected); [DllImport(AspNetCoreModuleDll)] - public unsafe static extern int http_enable_websockets(IntPtr pHttpContext); + public unsafe static extern int http_enable_websockets(IntPtr pInProcessHandler); [DllImport(AspNetCoreModuleDll)] - public unsafe static extern int http_cancel_io(IntPtr pHttpContext); + public unsafe static extern int http_cancel_io(IntPtr pInProcessHandler); [DllImport(AspNetCoreModuleDll)] public unsafe static extern int http_response_set_unknown_header(IntPtr pInProcessHandler, byte* pszHeaderName, byte* pszHeaderValue, ushort usHeaderValueLength, bool fReplace); diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISAwaitable.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISAwaitable.cs index ef5d29b078..28cc3672d5 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISAwaitable.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISAwaitable.cs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; @@ -18,8 +17,8 @@ namespace Microsoft.AspNetCore.Server.IISIntegration private Action _callback; private Exception _exception; - private int _cbBytes; + private int _hr; public static readonly NativeMethods.PFN_WEBSOCKET_ASYNC_COMPLETION ReadCallback = (IntPtr pHttpContext, IntPtr pCompletionInfo, IntPtr pvCompletionContext) => { @@ -52,15 +51,30 @@ namespace Microsoft.AspNetCore.Server.IISIntegration { var exception = _exception; var cbBytes = _cbBytes; + var hr = _hr; // Reset the awaitable state _exception = null; _cbBytes = 0; _callback = null; + _hr = 0; if (exception != null) { - throw exception; + // If the exception was an aborted read operation, + // return -1 to notify NativeReadAsync that the write was cancelled. + // E_OPERATIONABORTED == 0x800703e3 == -2147023901 + // We also don't throw the exception here as this is expected behavior + // and can negatively impact perf if we catch an exception for each + // cann + if (hr != IISServerConstants.HResultCancelIO) + { + throw exception; + } + else + { + cbBytes = -1; + } } return cbBytes; @@ -86,12 +100,20 @@ namespace Microsoft.AspNetCore.Server.IISIntegration public void Complete(int hr, int cbBytes) { + _hr = hr; + _exception = Marshal.GetExceptionForHR(hr); + _cbBytes = cbBytes; + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + continuation?.Invoke(); + } + + public Action GetCompletion(int hr, int cbBytes) + { + _hr = hr; _exception = Marshal.GetExceptionForHR(hr); _cbBytes = cbBytes; - var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); - - continuation?.Invoke(); + return Interlocked.Exchange(ref _callback, _callbackCompleted); } } } diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.FeatureCollection.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.FeatureCollection.cs index 345bd6f34e..7f1efba984 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.FeatureCollection.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.FeatureCollection.cs @@ -5,7 +5,6 @@ using System; using System.Collections; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Net; using System.Security.Claims; using System.Threading; @@ -14,7 +13,6 @@ using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Http.Features.Authentication; using Microsoft.AspNetCore.WebUtilities; -using Microsoft.Extensions.Primitives; namespace Microsoft.AspNetCore.Server.IISIntegration { @@ -291,15 +289,16 @@ namespace Microsoft.AspNetCore.Server.IISIntegration { throw new InvalidOperationException("CoreStrings.UpgradeCannotBeCalledMultipleTimes"); } + _wasUpgraded = true; StatusCode = StatusCodes.Status101SwitchingProtocols; ReasonPhrase = ReasonPhrases.GetReasonPhrase(StatusCodes.Status101SwitchingProtocols); - await UpgradeAsync(); - NativeMethods.http_enable_websockets(_pInProcessHandler); - - _wasUpgraded = true; _readWebSocketsOperation = new IISAwaitable(); _writeWebSocketsOperation = new IISAwaitable(); + NativeMethods.http_enable_websockets(_pInProcessHandler); + + // Upgrade async will cause the stream processing to go into duplex mode + await UpgradeAsync(); return new DuplexStream(RequestBody, ResponseBody); } diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.ReadWrite.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.ReadWrite.cs new file mode 100644 index 0000000000..f60a2a848a --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.ReadWrite.cs @@ -0,0 +1,454 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.HttpSys.Internal; + +namespace Microsoft.AspNetCore.Server.IISIntegration +{ + internal partial class IISHttpContext + { + /// + /// Reads data from the Input pipe to the user. + /// + /// + /// + /// + /// + /// + public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + // Start a task which will continuously call ReadFromIISAsync and WriteToIISAsync + StartProcessingRequestAndResponseBody(); + + while (true) + { + var result = await Input.Reader.ReadAsync(); + var readableBuffer = result.Buffer; + try + { + if (!readableBuffer.IsEmpty) + { + var actual = Math.Min(readableBuffer.Length, count); + readableBuffer = readableBuffer.Slice(0, actual); + readableBuffer.CopyTo(buffer); + return (int)actual; + } + else if (result.IsCompleted) + { + return 0; + } + } + finally + { + Input.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); + } + } + } + + /// + /// Writes data to the output pipe. + /// + /// + /// + /// + public Task WriteAsync(ArraySegment data, CancellationToken cancellationToken = default(CancellationToken)) + { + if (!_hasResponseStarted) + { + return WriteAsyncAwaited(data, cancellationToken); + } + + lock (_stateSync) + { + DisableReads(); + return Output.WriteAsync(data, cancellationToken: cancellationToken); + } + } + + /// + /// Flushes the data in the output pipe + /// + /// + /// + public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + if (!_hasResponseStarted) + { + return FlushAsyncAwaited(cancellationToken); + } + lock (_stateSync) + { + DisableReads(); + return Output.FlushAsync(cancellationToken); + } + } + + public void StartProcessingRequestAndResponseBody() + { + if (_processBodiesTask == null) + { + lock (_createReadWriteBodySync) + { + if (_processBodiesTask == null) + { + _processBodiesTask = ConsumeAsync(); + } + } + } + } + + private async Task FlushAsyncAwaited(CancellationToken cancellationToken) + { + await InitializeResponseAwaited(); + + Task flushTask; + lock (_stateSync) + { + DisableReads(); + + // Want to guarantee that data has been written to the pipe before releasing the lock. + flushTask = Output.FlushAsync(cancellationToken: cancellationToken); + } + await flushTask; + } + + private async Task WriteAsyncAwaited(ArraySegment data, CancellationToken cancellationToken) + { + // WriteAsyncAwaited is only called for the first write to the body. + // Ensure headers are flushed if Write(Chunked)Async isn't called. + await InitializeResponseAwaited(); + + Task writeTask; + lock (_stateSync) + { + DisableReads(); + + // Want to guarantee that data has been written to the pipe before releasing the lock. + writeTask = Output.WriteAsync(data, cancellationToken: cancellationToken); + } + await writeTask; + } + + // ConsumeAsync is called when either the first read or first write is done. + // There are two modes for reading and writing to the request/response bodies without upgrade. + // 1. Await all reads and try to read from the Output pipe + // 2. Done reading and await all writes. + // If the request is upgraded, we will start bidirectional streams for the input and output. + private async Task ConsumeAsync() + { + await ReadAndWriteLoopAsync(); + + // The ReadAndWriteLoop can return due to being upgraded. Check if _wasUpgraded is true to determine + // whether we go to a bidirectional stream or only write. + if (_wasUpgraded) + { + await StartBidirectionalStream(); + } + } + + private unsafe IISAwaitable ReadFromIISAsync(int length) + { + Action completion = null; + lock (_stateSync) + { + // We don't want to read if there is data available in the output pipe + // Therefore, we mark the current operation as cancelled to allow for the read + // to be requeued. + if (Output.Reader.TryRead(out var result)) + { + // If the buffer is empty, it is considered a write of zero. + // we still want to cancel and allow the write to occur. + completion = _operation.GetCompletion(hr: IISServerConstants.HResultCancelIO, cbBytes: 0); + Output.Reader.AdvanceTo(result.Buffer.Start); + } + else + { + var hr = NativeMethods.http_read_request_bytes( + _pInProcessHandler, + (byte*)_inputHandle.Pointer, + length, + out var dwReceivedBytes, + out bool fCompletionExpected); + // if we complete the read synchronously, there is no need to set the reading flag + // as there is no cancelable operation. + if (!fCompletionExpected) + { + completion = _operation.GetCompletion(hr, dwReceivedBytes); + } + else + { + _reading = true; + } + } + } + + // Invoke the completion outside of the lock if the reead finished synchronously. + completion?.Invoke(); + + return _operation; + } + + private unsafe IISAwaitable WriteToIISAsync(ReadOnlySequence buffer) + { + var fCompletionExpected = false; + var hr = 0; + var nChunks = 0; + + if (buffer.IsSingleSegment) + { + nChunks = 1; + } + else + { + foreach (var memory in buffer) + { + nChunks++; + } + } + + if (buffer.IsSingleSegment) + { + var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1]; + + fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span)) + { + ref var chunk = ref pDataChunks[0]; + + chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; + chunk.fromMemory.pBuffer = (IntPtr)pBuffer; + chunk.fromMemory.BufferLength = (uint)buffer.Length; + hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected); + } + } + else + { + // REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life + var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks]; + var currentChunk = 0; + + // REVIEW: We don't really need this list since the memory is already pinned with the default pool, + // but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation... + var handles = new MemoryHandle[nChunks]; + + foreach (var b in buffer) + { + ref var handle = ref handles[currentChunk]; + ref var chunk = ref pDataChunks[currentChunk]; + + handle = b.Retain(true); + + chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; + chunk.fromMemory.BufferLength = (uint)b.Length; + chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer; + + currentChunk++; + } + + hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected); + // Free the handles + foreach (var handle in handles) + { + handle.Dispose(); + } + } + + if (!fCompletionExpected) + { + _operation.Complete(hr, 0); + } + return _operation; + } + + private unsafe IISAwaitable FlushToIISAsync() + { + // Calls flush + var hr = 0; + hr = NativeMethods.http_flush_response_bytes(_pInProcessHandler, out var fCompletionExpected); + if (!fCompletionExpected) + { + _operation.Complete(hr, 0); + } + + return _operation; + } + + /// + /// Main function for control flow with IIS. + /// Uses two Pipes (Input and Output) between application calls to Read/Write/FlushAsync + /// Control Flow: + /// Try to see if there is data written by the application code (using TryRead) + /// and write it to IIS. + /// Check if the connection has been upgraded and call StartBidirectionalStreams + /// if it has. + /// Await reading from IIS, which will be cancelled if application code calls Write/FlushAsync. + /// + /// The Reading and Writing task. + public async Task ReadAndWriteLoopAsync() + { + try + { + while (true) + { + // First we check if there is anything to write from the Output pipe + // If there is, we call WriteToIISAsync + // Check if Output pipe has anything to write to IIS. + if (Output.Reader.TryRead(out var readResult)) + { + var buffer = readResult.Buffer; + + try + { + if (!buffer.IsEmpty) + { + // Write to IIS buffers + // Guaranteed to write the entire buffer to IIS + await WriteToIISAsync(buffer); + } + else if (readResult.IsCompleted) + { + break; + } + else + { + // Flush of zero bytes + await FlushToIISAsync(); + } + } + finally + { + // Always Advance the data pointer to the end of the buffer. + Output.Reader.AdvanceTo(buffer.End); + } + } + + // Check if there was an upgrade. If there is, we will replace the request and response bodies with + // two seperate loops. These will still be using the same Input and Output pipes here. + if (_upgradeTcs?.TrySetResult(null) == true) + { + // _wasUpgraded will be set at this point, exit the loop and we will check if we upgraded or not + // when going to next read/write type. + return; + } + + // Now we handle the read. + var memory = Input.Writer.GetMemory(); + _inputHandle = memory.Retain(true); + + try + { + // Lock around invoking ReadFromIISAsync as we don't want to call CancelIo + // when calling read + var read = await ReadFromIISAsync(memory.Length); + + // read value of 0 == done reading + // read value of -1 == read cancelled, still allowed to read but we + // need a write to occur first. + if (read == 0) + { + break; + } + else if (read == -1) + { + continue; + } + Input.Writer.Advance(read); + } + finally + { + // Always commit any changes to the Input pipe + _inputHandle.Dispose(); + } + + // Flush the read data for the Input Pipe writer + var flushResult = await Input.Writer.FlushAsync(); + + // If the pipe was closed, we are done reading, + if (flushResult.IsCompleted || flushResult.IsCanceled) + { + break; + } + } + + // Complete the input writer as we are done reading the request body. + Input.Writer.Complete(); + } + catch (Exception ex) + { + Input.Writer.Complete(ex); + } + + await WriteLoopAsync(); + } + + /// + /// Secondary function for control flow with IIS. This is only called once we are done + /// reading the request body. We now await reading from the Output pipe. + /// + /// + private async Task WriteLoopAsync() + { + try + { + while (true) + { + // Reading is done, so we will await all reads from the output pipe + var readResult = await Output.Reader.ReadAsync(); + + // Get data from pipe + var buffer = readResult.Buffer; + + try + { + if (!buffer.IsEmpty) + { + // Write to IIS buffers + // Guaranteed to write the entire buffer to IIS + await WriteToIISAsync(buffer); + } + else if (readResult.IsCompleted) + { + break; + } + else + { + // Flush of zero bytes will + await FlushToIISAsync(); + } + } + finally + { + // Always Advance the data pointer to the end of the buffer. + Output.Reader.AdvanceTo(buffer.End); + } + } + + // Close the output pipe as we are done reading from it. + Output.Reader.Complete(); + } + catch (Exception ex) + { + Output.Reader.Complete(ex); + } + } + + // Always called from within a lock + private void DisableReads() + { + // To avoid concurrent reading and writing, if we have a pending read, + // we must cancel it. + // _reading will always be false if we upgrade to websockets, so we don't need to check wasUpgrade + // Also, we set _reading to false after cancelling to detect redundant calls + if (_reading) + { + _reading = false; + // Calls IHttpContext->CancelIo(), which will cause the OnAsyncCompletion handler to fire. + NativeMethods.http_cancel_io(_pInProcessHandler); + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.Websockets.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.Websockets.cs new file mode 100644 index 0000000000..fa84c3ce11 --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.Websockets.cs @@ -0,0 +1,225 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Microsoft.AspNetCore.HttpSys.Internal; + +namespace Microsoft.AspNetCore.Server.IISIntegration +{ + /// + /// Represents the websocket portion of the + /// + internal partial class IISHttpContext + { + private bool _wasUpgraded; // Used for detecting repeated upgrades in IISHttpContext + + private IISAwaitable _readWebSocketsOperation; + private IISAwaitable _writeWebSocketsOperation; + private TaskCompletionSource _upgradeTcs; + + private Task StartBidirectionalStream() + { + // IIS allows for websocket support and duplex channels only on Win8 and above + // This allows us to have two tasks for reading the request and writing the response + var readWebsocketTask = ReadWebSockets(); + var writeWebsocketTask = WriteWebSockets(); + return Task.WhenAll(readWebsocketTask, writeWebsocketTask); + } + + public async Task UpgradeAsync() + { + if (_upgradeTcs == null) + { + _upgradeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // Flush any contents of the OutputPipe before upgrading to websockets. + await FlushAsync(); + await _upgradeTcs.Task; + } + } + + private unsafe IISAwaitable ReadWebSocketsFromIISAsync(int length) + { + var hr = 0; + int dwReceivedBytes; + bool fCompletionExpected; + + // For websocket calls, we can directly provide a callback function to be called once the websocket operation completes. + hr = NativeMethods.http_websockets_read_bytes( + _pInProcessHandler, + (byte*)_inputHandle.Pointer, + length, + IISAwaitable.ReadCallback, + (IntPtr)_thisHandle, + out dwReceivedBytes, + out fCompletionExpected); + if (!fCompletionExpected) + { + CompleteReadWebSockets(hr, dwReceivedBytes); + } + + return _readWebSocketsOperation; + } + + private unsafe IISAwaitable WriteWebSocketsFromIISAsync(ReadOnlySequence buffer) + { + var fCompletionExpected = false; + var hr = 0; + var nChunks = 0; + + if (buffer.IsSingleSegment) + { + nChunks = 1; + } + else + { + foreach (var memory in buffer) + { + nChunks++; + } + } + + if (buffer.IsSingleSegment) + { + var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1]; + + fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span)) + { + ref var chunk = ref pDataChunks[0]; + + chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; + chunk.fromMemory.pBuffer = (IntPtr)pBuffer; + chunk.fromMemory.BufferLength = (uint)buffer.Length; + hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected); + } + } + else + { + // REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life + var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks]; + var currentChunk = 0; + + // REVIEW: We don't really need this list since the memory is already pinned with the default pool, + // but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation... + var handles = new MemoryHandle[nChunks]; + + foreach (var b in buffer) + { + ref var handle = ref handles[currentChunk]; + ref var chunk = ref pDataChunks[currentChunk]; + + handle = b.Retain(true); + + chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; + chunk.fromMemory.BufferLength = (uint)b.Length; + chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer; + + currentChunk++; + } + + hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected); + + foreach (var handle in handles) + { + handle.Dispose(); + } + } + + if (!fCompletionExpected) + { + CompleteWriteWebSockets(hr, 0); + } + + return _writeWebSocketsOperation; + } + + internal void CompleteWriteWebSockets(int hr, int cbBytes) + { + _writeWebSocketsOperation.Complete(hr, cbBytes); + } + + internal void CompleteReadWebSockets(int hr, int cbBytes) + { + _readWebSocketsOperation.Complete(hr, cbBytes); + } + + private async Task ReadWebSockets() + { + try + { + while (true) + { + var memory = Input.Writer.GetMemory(); + _inputHandle = memory.Retain(true); + + try + { + int read = 0; + read = await ReadWebSocketsFromIISAsync(memory.Length); + + if (read == 0) + { + break; + } + + Input.Writer.Advance(read); + } + finally + { + _inputHandle.Dispose(); + } + + var result = await Input.Writer.FlushAsync(); + + if (result.IsCompleted || result.IsCanceled) + { + break; + } + } + Input.Writer.Complete(); + } + catch (Exception ex) + { + Input.Writer.Complete(ex); + } + } + + private async Task WriteWebSockets() + { + try + { + while (true) + { + var result = await Output.Reader.ReadAsync(); + + var buffer = result.Buffer; + var consumed = buffer.End; + + try + { + if (!buffer.IsEmpty) + { + await WriteWebSocketsFromIISAsync(buffer); + } + else if (result.IsCompleted) + { + break; + } + } + finally + { + Output.Reader.AdvanceTo(consumed); + } + } + + Output.Reader.Complete(); + } + catch (Exception ex) + { + Output.Reader.Complete(ex); + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.cs index 6a5beb7891..a2a4a34b81 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContext.cs @@ -25,16 +25,22 @@ namespace Microsoft.AspNetCore.Server.IISIntegration internal abstract partial class IISHttpContext : NativeRequestContext, IDisposable { private const int MinAllocBufferSize = 2048; + private const int PauseWriterThreshold = 65536; + private const int ResumeWriterTheshold = PauseWriterThreshold / 2; private static bool UpgradeAvailable = (Environment.OSVersion.Version >= new Version(6, 2)); protected readonly IntPtr _pInProcessHandler; - private bool _wasUpgraded; + private bool _reading; // To know whether we are currently in a read operation. + private volatile bool _hasResponseStarted; + private int _statusCode; private string _reasonPhrase; private readonly object _onStartingSync = new object(); private readonly object _onCompletedSync = new object(); + private readonly object _stateSync = new object(); + protected readonly object _createReadWriteBodySync = new object(); protected Stack, object>> _onStarting; protected Stack, object>> _onCompleted; @@ -45,20 +51,10 @@ namespace Microsoft.AspNetCore.Server.IISIntegration private GCHandle _thisHandle; private MemoryHandle _inputHandle; private IISAwaitable _operation = new IISAwaitable(); - - private IISAwaitable _readWebSocketsOperation; - private IISAwaitable _writeWebSocketsOperation; - - private TaskCompletionSource _upgradeTcs; - - protected Task _readingTask; - protected Task _writingTask; + protected Task _processBodiesTask; protected int _requestAborted; - private CurrentOperationType _currentOperationType; - private Task _currentOperation = Task.CompletedTask; - private const string NtlmString = "NTLM"; private const string NegotiateString = "Negotiate"; private const string BasicString = "Basic"; @@ -141,8 +137,13 @@ namespace Microsoft.AspNetCore.Server.IISIntegration RequestBody = new IISHttpRequestBody(this); ResponseBody = new IISHttpResponseBody(this); - Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool)); - var pipe = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool)); + Input = new Pipe(new PipeOptions(_memoryPool, readerScheduler: PipeScheduler.ThreadPool, minimumSegmentSize: MinAllocBufferSize)); + var pipe = new Pipe(new PipeOptions( + _memoryPool, + readerScheduler: PipeScheduler.ThreadPool, + pauseWriterThreshold: PauseWriterThreshold, + resumeWriterThreshold: ResumeWriterTheshold, + minimumSegmentSize: MinAllocBufferSize)); Output = new OutputProducer(pipe); } @@ -154,7 +155,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration public string QueryString { get; set; } public string RawTarget { get; set; } public CancellationToken RequestAborted { get; set; } - public bool HasResponseStarted { get; set; } + public bool HasResponseStarted => _hasResponseStarted; public IPAddress RemoteIpAddress { get; set; } public int RemotePort { get; set; } public IPAddress LocalIpAddress { get; set; } @@ -178,7 +179,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration get { return _statusCode; } set { - if (HasResponseStarted) + if (_hasResponseStarted) { ThrowResponseAlreadyStartedException(nameof(StatusCode)); } @@ -191,7 +192,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration get { return _reasonPhrase; } set { - if (HasResponseStarted) + if (_hasResponseStarted) { ThrowResponseAlreadyStartedException(nameof(ReasonPhrase)); } @@ -199,108 +200,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration } } - private IISAwaitable DoFlushAsync() - { - unsafe - { - var hr = 0; - hr = NativeMethods.http_flush_response_bytes(_pInProcessHandler, out var fCompletionExpected); - if (!fCompletionExpected) - { - _operation.Complete(hr, 0); - } - return _operation; - } - } - - public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) - { - await InitializeResponse(0); - await Output.FlushAsync(cancellationToken); - } - - public async Task UpgradeAsync() - { - if (_upgradeTcs == null) - { - _upgradeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await FlushAsync(); - await _upgradeTcs.Task; - } - } - - public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - StartReadingRequestBody(); - - while (true) - { - var result = await Input.Reader.ReadAsync(); - var readableBuffer = result.Buffer; - try - { - if (!readableBuffer.IsEmpty) - { - var actual = Math.Min(readableBuffer.Length, count); - readableBuffer = readableBuffer.Slice(0, actual); - readableBuffer.CopyTo(buffer); - return (int)actual; - } - else if (result.IsCompleted) - { - return 0; - } - } - finally - { - Input.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); - } - } - } - - public Task WriteAsync(ArraySegment data, CancellationToken cancellationToken = default(CancellationToken)) - { - if (!HasResponseStarted) - { - return WriteAsyncAwaited(data, cancellationToken); - } - - // VerifyAndUpdateWrite(data.Count); - return Output.WriteAsync(data, cancellationToken: cancellationToken); - } - - public async Task WriteAsyncAwaited(ArraySegment data, CancellationToken cancellationToken) - { - await InitializeResponseAwaited(data.Count); - - // WriteAsyncAwaited is only called for the first write to the body. - // Ensure headers are flushed if Write(Chunked)Async isn't called. - await Output.WriteAsync(data, cancellationToken: cancellationToken); - } - - public Task InitializeResponse(int firstWriteByteCount) - { - if (HasResponseStarted) - { - return Task.CompletedTask; - } - - if (_onStarting != null) - { - return InitializeResponseAwaited(firstWriteByteCount); - } - - if (_applicationException != null) - { - ThrowResponseAbortedException(); - } - - ProduceStart(appCompleted: false); - - return Task.CompletedTask; - } - - private async Task InitializeResponseAwaited(int firstWriteByteCount) + private async Task InitializeResponseAwaited() { await FireOnStarting(); @@ -309,7 +209,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration ThrowResponseAbortedException(); } - ProduceStart(appCompleted: false); + await ProduceStart(appCompleted: false); } private void ThrowResponseAbortedException() @@ -317,25 +217,38 @@ namespace Microsoft.AspNetCore.Server.IISIntegration throw new ObjectDisposedException("Unhandled application exception", _applicationException); } - private void ProduceStart(bool appCompleted) + private async Task ProduceStart(bool appCompleted) { - if (HasResponseStarted) + if (_hasResponseStarted) { return; } - HasResponseStarted = true; + _hasResponseStarted = true; SendResponseHeaders(appCompleted); - StartWritingResponseBody(); + // On first flush for websockets, we need to flush the headers such that + // IIS will know that an upgrade occured. + // If we don't have anything on the Output pipe, the TryRead in ReadAndWriteLoopAsync + // will fail and we will signal the upgradeTcs that we are upgrading. However, we still + // didn't flush. To fix this, we flush 0 bytes right after writing the headers. + Task flushTask; + lock (_stateSync) + { + DisableReads(); + flushTask = Output.FlushAsync(); + } + await flushTask; + + StartProcessingRequestAndResponseBody(); } protected Task ProduceEnd() { if (_applicationException != null) { - if (HasResponseStarted) + if (_hasResponseStarted) { // We can no longer change the response, so we simply close the connection. return Task.CompletedTask; @@ -350,7 +263,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration } } - if (!HasResponseStarted) + if (!_hasResponseStarted) { return ProduceEndAwaited(); } @@ -367,10 +280,24 @@ namespace Microsoft.AspNetCore.Server.IISIntegration private async Task ProduceEndAwaited() { - ProduceStart(appCompleted: true); + if (_hasResponseStarted) + { + return; + } - // Force flush - await Output.FlushAsync(); + _hasResponseStarted = true; + + SendResponseHeaders(appCompleted: true); + StartProcessingRequestAndResponseBody(); + + Task flushAsync; + + lock (_stateSync) + { + DisableReads(); + flushAsync = Output.FlushAsync(); + } + await flushAsync; } public unsafe void SendResponseHeaders(bool appCompleted) @@ -423,280 +350,13 @@ namespace Microsoft.AspNetCore.Server.IISIntegration // TODO } - public void StartReadingRequestBody() - { - if (_readingTask == null) - { - _readingTask = ProcessRequestBody(); - } - } - - private async Task ProcessRequestBody() - { - try - { - while (true) - { - // These buffers are pinned - var wb = Input.Writer.GetMemory(MinAllocBufferSize); - _inputHandle = wb.Retain(true); - - try - { - int read = 0; - if (_wasUpgraded) - { - read = await ReadWebSocketsAsync(wb.Length); - } - else - { - _currentOperation = _currentOperation.ContinueWith(async (t) => - { - _currentOperationType = CurrentOperationType.Read; - read = await ReadAsync(wb.Length); - }).Unwrap(); - await _currentOperation; - } - - if (read == 0) - { - break; - } - - Input.Writer.Advance(read); - } - finally - { - _inputHandle.Dispose(); - } - - var result = await Input.Writer.FlushAsync(); - - if (result.IsCompleted || result.IsCanceled) - { - break; - } - } - - Input.Writer.Complete(); - } - catch (Exception ex) - { - Input.Writer.Complete(ex); - } - } - - public void StartWritingResponseBody() - { - if (_writingTask == null) - { - _writingTask = ProcessResponseBody(); - } - } - - private async Task ProcessResponseBody() - { - while (true) - { - ReadResult result; - - try - { - result = await Output.Reader.ReadAsync(); - } - catch - { - Output.Reader.Complete(); - return; - } - - var buffer = result.Buffer; - var consumed = buffer.End; - - try - { - if (result.IsCanceled) - { - break; - } - - if (!buffer.IsEmpty) - { - if (_wasUpgraded) - { - await WriteAsync(buffer); - } - else - { - _currentOperation = _currentOperation.ContinueWith(async (t) => - { - _currentOperationType = CurrentOperationType.Write; - await WriteAsync(buffer); - }).Unwrap(); - await _currentOperation; - } - } - else if (result.IsCompleted) - { - break; - } - else - { - _currentOperation = _currentOperation.ContinueWith(async (t) => - { - _currentOperationType = CurrentOperationType.Flush; - await DoFlushAsync(); - }).Unwrap(); - await _currentOperation; - } - - _upgradeTcs?.TrySetResult(null); - } - finally - { - Output.Reader.AdvanceTo(consumed); - } - } - Output.Reader.Complete(); - } - - private unsafe IISAwaitable WriteAsync(ReadOnlySequence buffer) - { - var fCompletionExpected = false; - var hr = 0; - var nChunks = 0; - - if (buffer.IsSingleSegment) - { - nChunks = 1; - } - else - { - foreach (var memory in buffer) - { - nChunks++; - } - } - - if (buffer.IsSingleSegment) - { - var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[1]; - - fixed (byte* pBuffer = &MemoryMarshal.GetReference(buffer.First.Span)) - { - ref var chunk = ref pDataChunks[0]; - - chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; - chunk.fromMemory.pBuffer = (IntPtr)pBuffer; - chunk.fromMemory.BufferLength = (uint)buffer.Length; - if (_wasUpgraded) - { - hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected); - } - else - { - hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected); - } - } - } - else - { - // REVIEW: Do we need to guard against this getting too big? It seems unlikely that we'd have more than say 10 chunks in real life - var pDataChunks = stackalloc HttpApiTypes.HTTP_DATA_CHUNK[nChunks]; - var currentChunk = 0; - - // REVIEW: We don't really need this list since the memory is already pinned with the default pool, - // but shouldn't assume the pool implementation right now. Unfortunately, this causes a heap allocation... - var handles = new MemoryHandle[nChunks]; - - foreach (var b in buffer) - { - ref var handle = ref handles[currentChunk]; - ref var chunk = ref pDataChunks[currentChunk]; - - handle = b.Retain(true); - - chunk.DataChunkType = HttpApiTypes.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory; - chunk.fromMemory.BufferLength = (uint)b.Length; - chunk.fromMemory.pBuffer = (IntPtr)handle.Pointer; - - currentChunk++; - } - if (_wasUpgraded) - { - hr = NativeMethods.http_websockets_write_bytes(_pInProcessHandler, pDataChunks, nChunks, IISAwaitable.WriteCallback, (IntPtr)_thisHandle, out fCompletionExpected); - } - else - { - hr = NativeMethods.http_write_response_bytes(_pInProcessHandler, pDataChunks, nChunks, out fCompletionExpected); - } - // Free the handles - foreach (var handle in handles) - { - handle.Dispose(); - } - } - - if (_wasUpgraded) - { - if (!fCompletionExpected) - { - CompleteWriteWebSockets(hr, 0); - } - return _writeWebSocketsOperation; - } - else - { - if (!fCompletionExpected) - { - _operation.Complete(hr, 0); - } - return _operation; - } - } - - private unsafe IISAwaitable ReadAsync(int length) - { - var hr = NativeMethods.http_read_request_bytes( - _pInProcessHandler, - (byte*)_inputHandle.Pointer, - length, - out var dwReceivedBytes, - out bool fCompletionExpected); - if (!fCompletionExpected) - { - _operation.Complete(hr, dwReceivedBytes); - } - return _operation; - } - - private unsafe IISAwaitable ReadWebSocketsAsync(int length) - { - var hr = 0; - int dwReceivedBytes; - bool fCompletionExpected; - hr = NativeMethods.http_websockets_read_bytes( - _pInProcessHandler, - (byte*)_inputHandle.Pointer, - length, - IISAwaitable.ReadCallback, - (IntPtr)_thisHandle, - out dwReceivedBytes, - out fCompletionExpected); - if (!fCompletionExpected) - { - CompleteReadWebSockets(hr, dwReceivedBytes); - } - return _readWebSocketsOperation; - } - public abstract Task ProcessRequestAsync(); public void OnStarting(Func callback, object state) { lock (_onStartingSync) { - if (HasResponseStarted) + if (_hasResponseStarted) { throw new InvalidOperationException("Response already started"); } @@ -809,26 +469,16 @@ namespace Microsoft.AspNetCore.Server.IISIntegration internal void OnAsyncCompletion(int hr, int cbBytes) { - switch (_currentOperationType) + // Must acquire the _stateSync here as anytime we call complete, we need to hold the lock + // to avoid races with cancellation. + Action continuation; + lock (_stateSync) { - case CurrentOperationType.Read: - case CurrentOperationType.Write: - _operation.Complete(hr, cbBytes); - break; - case CurrentOperationType.Flush: - _operation.Complete(hr, cbBytes); - break; + _reading = false; + continuation = _operation.GetCompletion(hr, cbBytes); } - } - internal void CompleteWriteWebSockets(int hr, int cbBytes) - { - _writeWebSocketsOperation.Complete(hr, cbBytes); - } - - internal void CompleteReadWebSockets(int hr, int cbBytes) - { - _readWebSocketsOperation.Complete(hr, cbBytes); + continuation?.Invoke(); } private bool disposedValue = false; // To detect redundant calls @@ -854,7 +504,7 @@ namespace Microsoft.AspNetCore.Server.IISIntegration public override void Dispose() { // Do not change this code. Put cleanup code in Dispose(bool disposing) above. - Dispose(true); + Dispose(disposing: true); } private void ThrowResponseAlreadyStartedException(string value) @@ -862,14 +512,6 @@ namespace Microsoft.AspNetCore.Server.IISIntegration throw new InvalidOperationException("Response already started"); } - private enum CurrentOperationType - { - None, - Read, - Write, - Flush - } - private WindowsPrincipal GetWindowsPrincipal() { var hr = NativeMethods.http_get_authentication_information(_pInProcessHandler, out var authenticationType, out var token); diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContextOfT.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContextOfT.cs index ba748c0d6e..19877452f3 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContextOfT.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpContextOfT.cs @@ -3,8 +3,8 @@ using System; using System.Buffers; -using System.Threading.Tasks; using System.Threading; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting.Server; @@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.IISIntegration try { context = _application.CreateContext(this); - await _application.ProcessRequestAsync(context); // TODO Verification of Response //if (Volatile.Read(ref _requestAborted) == 0) @@ -82,18 +81,15 @@ namespace Microsoft.AspNetCore.Server.IISIntegration // The app is finished and there should be nobody writing to the response pipe Output.Dispose(); - if (_writingTask != null) - { - await _writingTask; - } - // The app is finished and there should be nobody reading from the request pipe Input.Reader.Complete(); - if (_readingTask != null) + Task processBodiesTask; + lock (_createReadWriteBodySync) { - await _readingTask; + processBodiesTask = _processBodiesTask; } + await processBodiesTask; } return success; } diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpRequestBody.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpRequestBody.cs index 442fea6bce..0a51d9fc7a 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpRequestBody.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpRequestBody.cs @@ -2,10 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpServer.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpServer.cs index 78bb7f65e2..36a07752f5 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpServer.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISHttpServer.cs @@ -3,7 +3,6 @@ using System; using System.Buffers; -using System.IO.Pipelines; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerAuthenticationHandler.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerAuthenticationHandler.cs index eba243f9f3..587490bf50 100644 --- a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerAuthenticationHandler.cs +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerAuthenticationHandler.cs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Security.Principal; using System.Threading.Tasks; using Microsoft.AspNetCore.Authentication; using Microsoft.AspNetCore.Http; diff --git a/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerConstants.cs b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerConstants.cs new file mode 100644 index 0000000000..fc137bd159 --- /dev/null +++ b/src/Microsoft.AspNetCore.Server.IISIntegration/Server/IISServerConstants.cs @@ -0,0 +1,10 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.Server.IISIntegration +{ + internal static class IISServerConstants + { + internal const int HResultCancelIO = -2147023901; + } +} diff --git a/src/RequestHandler/inprocess/inprocesshandler.cpp b/src/RequestHandler/inprocess/inprocesshandler.cpp index e4b769ca96..e3b1025d2b 100644 --- a/src/RequestHandler/inprocess/inprocesshandler.cpp +++ b/src/RequestHandler/inprocess/inprocesshandler.cpp @@ -59,25 +59,15 @@ IN_PROCESS_HANDLER::OnAsyncCompletion( HRESULT hrCompletionStatus ) { - HRESULT hr; - if (FAILED(hrCompletionStatus)) + IN_PROCESS_APPLICATION* application = (IN_PROCESS_APPLICATION*)m_pApplication; + if (application == NULL) { return RQ_NOTIFICATION_FINISH_REQUEST; } - else - { - // For now we are assuming we are in our own self contained box. - // TODO refactor Finished and Failure sections to handle in process and out of process failure. - // TODO verify that websocket's OnAsyncCompletion is not calling this. - IN_PROCESS_APPLICATION* application = (IN_PROCESS_APPLICATION*)m_pApplication; - if (application == NULL) - { - hr = E_FAIL; - return RQ_NOTIFICATION_FINISH_REQUEST; - } - return application->OnAsyncCompletion(cbCompletion, hrCompletionStatus, this); - } + // OnAsyncCompletion must call into the application if there was a error. We will redo calls + // to Read/Write if we called cancelIo on the IHttpContext. + return application->OnAsyncCompletion(cbCompletion, hrCompletionStatus, this); } VOID diff --git a/test/IISIntegration.IISServerFunctionalTests/SynchronousReadAndWriteTests.cs b/test/IISIntegration.IISServerFunctionalTests/SynchronousReadAndWriteTests.cs new file mode 100644 index 0000000000..587df49b2c --- /dev/null +++ b/test/IISIntegration.IISServerFunctionalTests/SynchronousReadAndWriteTests.cs @@ -0,0 +1,123 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Net.Http; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Testing.xunit; +using Xunit; + +namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests +{ + [Collection(IISTestSiteCollection.Name)] + public class SynchronousReadAndWriteTests + { + private readonly IISTestSiteFixture _fixture; + + public SynchronousReadAndWriteTests(IISTestSiteFixture fixture) + { + _fixture = fixture; + } + + [ConditionalFact] + public async Task ReadAndWriteSynchronously() + { + for (int i = 0; i < 100; i++) + { + var content = new StringContent(new string('a', 100000)); + var response = await _fixture.Client.PostAsync("ReadAndWriteSynchronously", content); + var responseText = await response.Content.ReadAsStringAsync(); + + Assert.Equal(expected: 110000, actual: responseText.Length); + } + } + + [ConditionalFact] + public async Task ReadAndWriteEcho() + { + var body = new string('a', 100000); + var content = new StringContent(body); + var response = await _fixture.Client.PostAsync("ReadAndWriteEcho", content); + var responseText = await response.Content.ReadAsStringAsync(); + + Assert.Equal(body, responseText); + } + + [ConditionalFact] + public async Task ReadAndWriteCopyToAsync() + { + var body = new string('a', 100000); + var content = new StringContent(body); + var response = await _fixture.Client.PostAsync("ReadAndWriteCopyToAsync", content); + var responseText = await response.Content.ReadAsStringAsync(); + + Assert.Equal(body, responseText); + } + + [ConditionalFact] + public async Task ReadAndWriteEchoTwice() + { + var requestBody = new string('a', 10000); + var content = new StringContent(requestBody); + var response = await _fixture.Client.PostAsync("ReadAndWriteEchoTwice", content); + var responseText = await response.Content.ReadAsStringAsync(); + + Assert.Equal(requestBody.Length * 2, responseText.Length); + } + + [ConditionalFact] + public void ReadAndWriteSlowConnection() + { + var ipHostEntry = Dns.GetHostEntry(_fixture.Client.BaseAddress.Host); + + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + foreach (var hostEntry in ipHostEntry.AddressList) + { + try + { + socket.Connect(hostEntry, _fixture.Client.BaseAddress.Port); + break; + } + catch (Exception) + { + // Exceptions can be thrown based on ipv6 support + } + } + + var testString = "hello world"; + var request = $"POST /ReadAndWriteSlowConnection HTTP/1.0\r\n" + + $"Content-Length: {testString.Length}\r\n" + + "Host: " + "localhost\r\n" + + "\r\n"; + var bytes = 0; + var requestStringBytes = Encoding.ASCII.GetBytes(request); + var testStringBytes = Encoding.ASCII.GetBytes(testString); + + while ((bytes += socket.Send(requestStringBytes, bytes, 1, SocketFlags.None)) < requestStringBytes.Length) + { + } + + bytes = 0; + while ((bytes += socket.Send(testStringBytes, bytes, 1, SocketFlags.None)) < testStringBytes.Length) + { + Thread.Sleep(100); + } + + var stringBuilder = new StringBuilder(); + var buffer = new byte[4096]; + int size; + while ((size = socket.Receive(buffer, buffer.Length, SocketFlags.None)) != 0) + { + stringBuilder.Append(Encoding.ASCII.GetString(buffer, 0, size)); + } + + Assert.Contains(new StringBuilder().Insert(0, "hello world", 100).ToString(), stringBuilder.ToString()); + } + } + } +} diff --git a/test/IISIntegration.IISServerFunctionalTests/WebsocketTests.cs b/test/IISIntegration.IISServerFunctionalTests/WebsocketTests.cs new file mode 100644 index 0000000000..1766745b0e --- /dev/null +++ b/test/IISIntegration.IISServerFunctionalTests/WebsocketTests.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.AspNetCore.Server.IIS.FunctionalTests +{ + public class WebsocketTests + { + } +} diff --git a/test/IISTestSite/Startup.cs b/test/IISTestSite/Startup.cs index f9e0051105..75d6047425 100644 --- a/test/IISTestSite/Startup.cs +++ b/test/IISTestSite/Startup.cs @@ -4,6 +4,8 @@ using System; using System.IO; using System.Net; +using System.Text; +using System.Threading.Tasks; using Microsoft.AspNetCore.Authentication; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; @@ -35,6 +37,12 @@ namespace IISTestSite app.Map("/CheckEnvironmentLongValueVariable", CheckEnvironmentLongValueVariable); app.Map("/CheckAppendedEnvironmentVariable", CheckAppendedEnvironmentVariable); app.Map("/CheckRemoveAuthEnvironmentVariable", CheckRemoveAuthEnvironmentVariable); + app.Map("/ReadAndWriteSynchronously", ReadAndWriteSynchronously); + app.Map("/ReadAndWriteEcho", ReadAndWriteEcho); + app.Map("/ReadAndWriteCopyToAsync", ReadAndWriteCopyToAsync); + app.Map("/ReadAndWriteEchoTwice", ReadAndWriteEchoTwice); + app.Map("/ReadAndWriteSlowConnection", ReadAndWriteSlowConnection); + app.Map("/WebsocketRequest", WebsocketRequest); } private void ServerVariable(IApplicationBuilder app) @@ -325,5 +333,97 @@ namespace IISTestSite await context.Response.WriteAsync(variable); }); } + private void ReadAndWriteSynchronously(IApplicationBuilder app) + { + app.Run(async context => + { + var t2 = Task.Run(() => WriteManyTimesToResponseBody(context)); + var t1 = Task.Run(() => ReadRequestBody(context)); + await Task.WhenAll(t1, t2); + }); + } + + private async Task ReadRequestBody(HttpContext context) + { + var readBuffer = new byte[1]; + var result = await context.Request.Body.ReadAsync(readBuffer, 0, 1); + while (result != 0) + { + result = await context.Request.Body.ReadAsync(readBuffer, 0, 1); + } + } + + private async Task WriteManyTimesToResponseBody(HttpContext context) + { + for (var i = 0; i < 10000; i++) + { + await context.Response.WriteAsync("hello world"); + } + } + + private void ReadAndWriteEcho(IApplicationBuilder app) + { + app.Run(async context => + { + var readBuffer = new byte[4096]; + var result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length); + while (result != 0) + { + await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result)); + result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length); + } + }); + } + + private void ReadAndWriteEchoTwice(IApplicationBuilder app) + { + app.Run(async context => + { + var readBuffer = new byte[4096]; + var result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length); + while (result != 0) + { + await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result)); + await context.Response.Body.FlushAsync(); + await context.Response.WriteAsync(Encoding.UTF8.GetString(readBuffer, 0, result)); + await context.Response.Body.FlushAsync(); + result = await context.Request.Body.ReadAsync(readBuffer, 0, readBuffer.Length); + } + }); + } + + private void ReadAndWriteSlowConnection(IApplicationBuilder app) + { + app.Run(async context => + { + var t2 = Task.Run(() => WriteResponseBodyAFewTimes(context)); + var t1 = Task.Run(() => ReadRequestBody(context)); + await Task.WhenAll(t1, t2); + }); + } + + private async Task WriteResponseBodyAFewTimes(HttpContext context) + { + for (var i = 0; i < 100; i++) + { + await context.Response.WriteAsync("hello world"); + } + } + + private void WebsocketRequest(IApplicationBuilder app) + { + app.Run(async context => + { + await context.Response.WriteAsync("test"); + }); + } + + private void ReadAndWriteCopyToAsync(IApplicationBuilder app) + { + app.Run(async context => + { + await context.Request.Body.CopyToAsync(context.Response.Body); + }); + } } }