From bd19022c4c97b92e7dcd247960756257d7cd1331 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 10 Jan 2017 01:03:46 -0800 Subject: [PATCH] Fixed teardown for streaming connections - Added mega hack for cancellation until we get newer pipeline implementations. --- .../Internal/FramingChannel.cs | 29 +++++++++++++++++-- .../Internal/PipelineConnection.cs | 16 ++++++---- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/Microsoft.AspNetCore.Sockets/Internal/FramingChannel.cs b/src/Microsoft.AspNetCore.Sockets/Internal/FramingChannel.cs index 3ac63436a7..fa791075c2 100644 --- a/src/Microsoft.AspNetCore.Sockets/Internal/FramingChannel.cs +++ b/src/Microsoft.AspNetCore.Sockets/Internal/FramingChannel.cs @@ -43,6 +43,15 @@ namespace Microsoft.AspNetCore.Sockets.Internal } } + private void CancelRead() + { + // We need to fake cancellation support until we get a newer build of pipelines that has CancelPendingRead() + + // HACK: from hell, we attempt to cast the input to a pipeline writer and write 0 bytes so it so that we can + // force yielding the awaiter, this is buggy because overlapping writes can be a problem. + (_connection.Input as IPipelineWriter)?.WriteAsync(Span.Empty); + } + bool IReadableChannel.TryRead(out Message item) { // We need to think about how we do this. There's no way to check if there is data available in a Pipeline... though maybe there should be @@ -81,14 +90,18 @@ namespace Microsoft.AspNetCore.Sockets.Internal bool IWritableChannel.TryComplete(Exception error) { _connection.Output.Complete(error); + _connection.Input.Complete(error); return true; } private async Task AwaitReadAsync(ReadableBufferAwaitable awaiter, CancellationToken cancellationToken) { - // Just await and then call ReadSync - var result = await awaiter; - return ReadSync(result, cancellationToken); + using (cancellationToken.Register(state => ((FramingChannel)state).CancelRead(), this)) + { + // Just await and then call ReadSync + var result = await awaiter; + return ReadSync(result, cancellationToken); + } } private Message ReadSync(ReadResult result, CancellationToken cancellationToken) @@ -107,6 +120,16 @@ namespace Microsoft.AspNetCore.Sockets.Internal _tcs.TrySetResult(null); } + if (cancellationToken.IsCancellationRequested) + { + _tcs.TrySetCanceled(); + + msg.Dispose(); + + // In order to keep the behavior consistent between the transports, we throw if the token was cancelled + throw new OperationCanceledException(); + } + return msg; } diff --git a/src/Microsoft.AspNetCore.Sockets/Internal/PipelineConnection.cs b/src/Microsoft.AspNetCore.Sockets/Internal/PipelineConnection.cs index 6390679e88..85fc313c3a 100644 --- a/src/Microsoft.AspNetCore.Sockets/Internal/PipelineConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets/Internal/PipelineConnection.cs @@ -1,16 +1,20 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.IO.Pipelines; namespace Microsoft.AspNetCore.Sockets.Internal { public class PipelineConnection : IPipelineConnection { - public IPipelineReader Input { get; } - public IPipelineWriter Output { get; } + public PipelineReaderWriter Input { get; } + public PipelineReaderWriter Output { get; } - public PipelineConnection(IPipelineReader input, IPipelineWriter output) + IPipelineReader IPipelineConnection.Input => Input; + IPipelineWriter IPipelineConnection.Output => Output; + + public PipelineConnection(PipelineReaderWriter input, PipelineReaderWriter output) { Input = input; Output = output; @@ -18,8 +22,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal public void Dispose() { - Input.Complete(); - Output.Complete(); + Input.CompleteReader(); + Input.CompleteWriter(); + Output.CompleteReader(); + Output.CompleteWriter(); } } }