From a4053acd0627c6e587935ba5a3e04e54027c7a56 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 18 Aug 2017 09:56:40 -0700 Subject: [PATCH] Give Client a chance to receive Close Frame from Server (#730) --- .../Internal/SocketClientLoggerExtensions.cs | 11 ++++++ .../WebSocketsTransport.cs | 36 ++++++++++++++----- .../WebSocketsTransportTests.cs | 2 +- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs index 4b277f1dc7..32d8bd52ed 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs @@ -62,6 +62,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal private static readonly Action _closingWebSocketFailed = LoggerMessage.Define(LogLevel.Information, 16, "{time}: Connection Id {connectionId}: Closing webSocket failed."); + private static readonly Action _cancelMessage = + LoggerMessage.Define(LogLevel.Debug, 17, "{time}: Connection Id {connectionId}: Canceled passing message to application."); + // Category: ServerSentEventsTransport and LongPollingTransport private static readonly Action _sendingMessages = LoggerMessage.Define(LogLevel.Debug, 9, "{time}: Connection Id {connectionId}: Sending {count} message(s) to the server using url: {url}."); @@ -283,6 +286,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal } } + public static void CancelMessage(this ILogger logger, string connectionId) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + _cancelMessage(logger, DateTime.Now, connectionId, null); + } + } + public static void SendingMessages(this ILogger logger, string connectionId, int count, Uri url) { if (logger.IsEnabled(LogLevel.Debug)) diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs index 7177993346..2249c44bc9 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs @@ -19,6 +19,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private readonly ClientWebSocket _webSocket = new ClientWebSocket(); private Channel _application; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); + private readonly CancellationTokenSource _receiveCts = new CancellationTokenSource(); private readonly ILogger _logger; private string _connectionId; @@ -80,7 +81,7 @@ namespace Microsoft.AspNetCore.Sockets.Client try { - while (!_transportCts.Token.IsCancellationRequested) + while (!_receiveCts.Token.IsCancellationRequested) { const int bufferSize = 4096; var totalBytes = 0; @@ -91,7 +92,7 @@ namespace Microsoft.AspNetCore.Sockets.Client var buffer = new ArraySegment(new byte[bufferSize]); //Exceptions are handled above where the send and receive tasks are being run. - receiveResult = await _webSocket.ReceiveAsync(buffer, _transportCts.Token); + receiveResult = await _webSocket.ReceiveAsync(buffer, _receiveCts.Token); if (receiveResult.MessageType == WebSocketMessageType.Close) { _logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus); @@ -129,15 +130,25 @@ namespace Microsoft.AspNetCore.Sockets.Client Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, messageBuffer, 0, incomingMessage[0].Count); } - _logger.MessageToApp(_connectionId, messageBuffer.Length); - while (await _application.Out.WaitToWriteAsync(_transportCts.Token)) + try { - if (_application.Out.TryWrite(messageBuffer)) + if (!_transportCts.Token.IsCancellationRequested) { - incomingMessage.Clear(); - break; + _logger.MessageToApp(_connectionId, messageBuffer.Length); + while (await _application.Out.WaitToWriteAsync(_transportCts.Token)) + { + if (_application.Out.TryWrite(messageBuffer)) + { + incomingMessage.Clear(); + break; + } + } } } + catch (OperationCanceledException) + { + _logger.CancelMessage(_connectionId); + } } } catch (OperationCanceledException) @@ -198,7 +209,7 @@ namespace Microsoft.AspNetCore.Sockets.Client finally { _logger.SendStopped(_connectionId); - _transportCts.Cancel(); + TriggerCancel(); } } @@ -248,7 +259,7 @@ namespace Microsoft.AspNetCore.Sockets.Client await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None); // shutdown the transport after a timeout in case the server does not send close frame - _transportCts.CancelAfter(TimeSpan.FromSeconds(5)); + TriggerCancel(); } } catch (Exception ex) @@ -258,5 +269,12 @@ namespace Microsoft.AspNetCore.Sockets.Client _logger.ClosingWebSocketFailed(_connectionId, ex); } } + + private void TriggerCancel() + { + // Give server 5 seconds to respond with a close frame for graceful close. + _receiveCts.CancelAfter(TimeSpan.FromSeconds(5)); + _transportCts.Cancel(); + } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs index 627b03242b..62ef0b3e91 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs @@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, TransferMode.Binary, connectionId: string.Empty); connectionToTransport.Out.TryComplete(); - await webSocketsTransport.Running.OrTimeout(); + await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10)); } }