From 21be33023cc662a5f82233f6bc5f33a1a2130eba Mon Sep 17 00:00:00 2001 From: Cesar Blum Silveira Date: Wed, 8 Feb 2017 16:39:14 -0800 Subject: [PATCH] Implement APM methods in streams (#1335). --- .../Filter/Internal/LibuvStream.cs | 78 +++++++++++++++++++ .../Filter/Internal/StreamSocketOutput.cs | 49 ++++-------- .../Internal/Http/FrameRequestStream.cs | 8 +- .../Internal/Http/FrameResponseStream.cs | 40 ++++++++++ 4 files changed, 133 insertions(+), 42 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs index 3ae1ddbd69..0127d156a0 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs @@ -132,5 +132,83 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal { return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count); } + +#if NET451 + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = ReadAsync(buffer, offset, count, default(CancellationToken), state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override int EndRead(IAsyncResult asyncResult) + { + return ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = ReadAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(task2.Result); + } + }, tcs, cancellationToken); + return tcs.Task; + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = WriteAsync(buffer, offset, count, default(CancellationToken), state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override void EndWrite(IAsyncResult asyncResult) + { + ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = WriteAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(null); + } + }, tcs, cancellationToken); + return tcs.Task; + } +#endif } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs index bfd7d5201b..052ab4f068 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs @@ -24,8 +24,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal private bool _canWrite = true; - private object _writeLock = new object(); - public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger) { _connectionId = connectionId; @@ -36,54 +34,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal public void Write(ArraySegment buffer, bool chunk) { - lock (_writeLock) + if (buffer.Count == 0 ) { - if (buffer.Count == 0 ) - { - return; - } + return; + } - try - { - if (!_canWrite) - { - return; - } + if (chunk && buffer.Array != null) + { + var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count); + _outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count); + } - if (chunk && buffer.Array != null) - { - var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count); - _outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count); - } + _outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count); - _outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count); - - if (chunk && buffer.Array != null) - { - _outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length); - } - } - catch (Exception ex) - { - _canWrite = false; - _logger.ConnectionError(_connectionId, ex); - } + if (chunk && buffer.Array != null) + { + _outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length); } } public Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { -#if NET451 - Write(buffer, chunk); - return TaskUtilities.CompletedTask; -#else if (chunk && buffer.Array != null) { return WriteAsyncChunked(buffer, cancellationToken); } return _outputStream.WriteAsync(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count, cancellationToken); -#endif } private async Task WriteAsyncChunked(ArraySegment buffer, CancellationToken cancellationToken) @@ -124,7 +101,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal block = block.Next; returnBlock.Pool.Return(returnBlock); } - + if (_canWrite) { try diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameRequestStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameRequestStream.cs index f2f7b379be..0874fd9441 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameRequestStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameRequestStream.cs @@ -75,8 +75,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http #if NET451 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { - ValidateState(default(CancellationToken)); - var task = ReadAsync(buffer, offset, count, default(CancellationToken), state); if (callback != null) { @@ -92,11 +90,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http private Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) { - ValidateState(cancellationToken); - var tcs = new TaskCompletionSource(state); - var task = _body.ReadAsync(new ArraySegment(buffer, offset, count), cancellationToken); - task.AsTask().ContinueWith((task2, state2) => + var task = ReadAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => { var tcs2 = (TaskCompletionSource)state2; if (task2.IsCanceled) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameResponseStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameResponseStream.cs index 61013ac16c..5a3c9159d5 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameResponseStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/FrameResponseStream.cs @@ -85,6 +85,46 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http _frameControl.Write(new ArraySegment(buffer, offset, count)); } +#if NET451 + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = WriteAsync(buffer, offset, count, default(CancellationToken), state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override void EndWrite(IAsyncResult asyncResult) + { + ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = WriteAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(null); + } + }, tcs, cancellationToken); + return tcs.Task; + } +#endif + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var task = ValidateState(cancellationToken);