From 4337d2a4a751ee7ccb0e0c460670c0abd044d040 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Thu, 28 Jul 2016 16:02:30 -0700 Subject: [PATCH] Implement Begin/End Read/Write methods in LibuvStream --- .../Filter/Internal/LibuvStream.cs | 78 +++++++++++++++++++ .../Filter/Internal/StreamSocketOutput.cs | 47 +++-------- 2 files changed, 90 insertions(+), 35 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs index 3ae1ddbd69..0127d156a0 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/LibuvStream.cs @@ -132,5 +132,83 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal { return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count); } + +#if NET451 + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = ReadAsync(buffer, offset, count, default(CancellationToken), state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override int EndRead(IAsyncResult asyncResult) + { + return ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = ReadAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(task2.Result); + } + }, tcs, cancellationToken); + return tcs.Task; + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = WriteAsync(buffer, offset, count, default(CancellationToken), state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override void EndWrite(IAsyncResult asyncResult) + { + ((Task)asyncResult).GetAwaiter().GetResult(); + } + + private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + var tcs = new TaskCompletionSource(state); + var task = WriteAsync(buffer, offset, count, cancellationToken); + task.ContinueWith((task2, state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (task2.IsCanceled) + { + tcs2.SetCanceled(); + } + else if (task2.IsFaulted) + { + tcs2.SetException(task2.Exception); + } + else + { + tcs2.SetResult(null); + } + }, tcs, cancellationToken); + return tcs.Task; + } +#endif } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs index bfd7d5201b..52397633fc 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/StreamSocketOutput.cs @@ -24,8 +24,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal private bool _canWrite = true; - private object _writeLock = new object(); - public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger) { _connectionId = connectionId; @@ -36,54 +34,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal public void Write(ArraySegment buffer, bool chunk) { - lock (_writeLock) + if (buffer.Count == 0 ) { - if (buffer.Count == 0 ) - { - return; - } + return; + } - try - { - if (!_canWrite) - { - return; - } + if (chunk && buffer.Array != null) + { + var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count); + _outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count); + } - if (chunk && buffer.Array != null) - { - var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count); - _outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count); - } + _outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count); - _outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count); - - if (chunk && buffer.Array != null) - { - _outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length); - } - } - catch (Exception ex) - { - _canWrite = false; - _logger.ConnectionError(_connectionId, ex); - } + if (chunk && buffer.Array != null) + { + _outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length); } } public Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) { -#if NET451 - Write(buffer, chunk); - return TaskUtilities.CompletedTask; -#else if (chunk && buffer.Array != null) { return WriteAsyncChunked(buffer, cancellationToken); } return _outputStream.WriteAsync(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count, cancellationToken); -#endif } private async Task WriteAsyncChunked(ArraySegment buffer, CancellationToken cancellationToken)