diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index 7c55d7c698..c181fab65f 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -64,6 +64,8 @@ namespace Microsoft.AspNetCore.Sockets.Client throw new InvalidOperationException("Cannot start a connection that is not in the Initial state."); } + _logger.LogDebug("Starting connection."); + try { var connectUrl = await GetConnectUrl(Url, httpClient, _logger); @@ -71,10 +73,13 @@ namespace Microsoft.AspNetCore.Sockets.Client // Connection is being stopped while start was in progress if (_connectionState == ConnectionState.Disconnected) { + _logger.LogDebug("Connection was closed from a different thread."); return; } _transport = transport ?? new WebSocketsTransport(_loggerFactory); + + _logger.LogDebug("Starting transport '{0}' with Url: {1}", transport.GetType().Name, connectUrl); await StartTransport(connectUrl); } catch @@ -89,6 +94,8 @@ namespace Microsoft.AspNetCore.Sockets.Client { var ignore = _eventQueue.Enqueue(() => { + _logger.LogDebug("Raising Connected event"); + // Do not "simplify" - events can be removed from a different thread var connectedEventHandler = Connected; if (connectedEventHandler != null) @@ -103,8 +110,12 @@ namespace Microsoft.AspNetCore.Sockets.Client { Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected); + _logger.LogDebug("Draining event queue"); + await _eventQueue.Drain(); + _logger.LogDebug("Raising Closed event"); + // Do not "simplify" - event handlers can be removed from a different thread var closedEventHandler = Closed; if (closedEventHandler != null) @@ -196,7 +207,7 @@ namespace Microsoft.AspNetCore.Sockets.Client if (Input.TryRead(out Message message)) { _logger.LogDebug("Scheduling raising Received event."); - var ignore = _eventQueue.Enqueue(() => + var ignore = _eventQueue.Enqueue(() => { // Do not "simplify" - event handlers can be removed from a different thread var receivedEventHandler = Received; @@ -250,6 +261,8 @@ namespace Microsoft.AspNetCore.Sockets.Client var sendTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var message = new SendMessage(data, type, sendTcs); + _logger.LogDebug("Sending message"); + while (await Output.WaitToWriteAsync(cancellationToken)) { if (Output.TryWrite(message)) diff --git a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs index a294655a41..3d4c99a1a0 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/LongPollingTransport.cs @@ -42,6 +42,8 @@ namespace Microsoft.AspNetCore.Sockets.Client public Task StartAsync(Uri url, IChannelConnection application) { + _logger.LogInformation("Starting {0}", nameof(LongPollingTransport)); + _application = application; // Start sending and polling @@ -50,6 +52,8 @@ namespace Microsoft.AspNetCore.Sockets.Client Running = Task.WhenAll(_sender, _poller).ContinueWith(t => { + _logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException); + _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); @@ -59,7 +63,10 @@ namespace Microsoft.AspNetCore.Sockets.Client public async Task StopAsync() { + _logger.LogInformation("Transport {0} is stopping", nameof(LongPollingTransport)); + _transportCts.Cancel(); + try { await Running; @@ -68,10 +75,13 @@ namespace Microsoft.AspNetCore.Sockets.Client { // exceptions have been handled in the Running task continuation by closing the channel with the exception } + + _logger.LogInformation("Transport {0} stopped", nameof(LongPollingTransport)); } private async Task Poll(Uri pollUrl, CancellationToken cancellationToken) { + _logger.LogInformation("Starting the receive loop"); try { while (!cancellationToken.IsCancellationRequested) @@ -84,11 +94,15 @@ namespace Microsoft.AspNetCore.Sockets.Client if (response.StatusCode == HttpStatusCode.NoContent || cancellationToken.IsCancellationRequested) { + _logger.LogDebug("The server is closing the connection"); + // Transport closed or polling stopped, we're done break; } else { + _logger.LogDebug("Receive a message from the server"); + // Read the whole payload var payload = await response.Content.ReadAsByteArrayAsync(); @@ -119,6 +133,8 @@ namespace Microsoft.AspNetCore.Sockets.Client // Make sure the send loop is terminated _transportCts.Cancel(); } + + _logger.LogInformation("Receive loop stopped"); } private IEnumerable ReadMessages(ReadOnlySpan payload) @@ -145,6 +161,8 @@ namespace Microsoft.AspNetCore.Sockets.Client private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken) { + _logger.LogInformation("Starting the send loop"); + TaskCompletionSource sendTcs = null; try { @@ -161,8 +179,13 @@ namespace Microsoft.AspNetCore.Sockets.Client request.Content = new ByteArrayContent(message.Payload); } + _logger.LogDebug("Sending a message to the server using url: '{0}'. Message type {1}", sendUrl, message.Type); + var response = await _httpClient.SendAsync(request); response.EnsureSuccessStatusCode(); + + _logger.LogDebug("Message sent successfully"); + sendTcs.SetResult(null); } } @@ -183,6 +206,8 @@ namespace Microsoft.AspNetCore.Sockets.Client // Make sure the poll loop is terminated _transportCts.Cancel(); } + + _logger.LogInformation("Send loop stopped"); } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs index 6634190624..f279eba646 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/WebSocketsTransport.cs @@ -33,6 +33,8 @@ namespace Microsoft.AspNetCore.Sockets.Client public async Task StartAsync(Uri url, IChannelConnection application) { + _logger.LogInformation("Starting {0}", nameof(WebSocketsTransport)); + if (url == null) { throw new ArgumentNullException(nameof(url)); @@ -53,6 +55,8 @@ namespace Microsoft.AspNetCore.Sockets.Client // https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251 Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t => { + _logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException); + _application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null); return t; }).Unwrap(); @@ -60,6 +64,8 @@ namespace Microsoft.AspNetCore.Sockets.Client private async Task ReceiveMessages(Uri pollUrl, CancellationToken cancellationToken) { + _logger.LogInformation("Starting receive loop"); + while (!cancellationToken.IsCancellationRequested) { const int bufferSize = 1024; @@ -74,9 +80,15 @@ namespace Microsoft.AspNetCore.Sockets.Client receiveResult = await _webSocket.ReceiveAsync(buffer, cancellationToken); if (receiveResult.MessageType == WebSocketMessageType.Close) { + _logger.LogInformation("Websocket closed by the server. Close status {0}", receiveResult.CloseStatus); + _application.Output.Complete(); return; } + + _logger.LogDebug("Message received. Type: {0}, size: {1}, EndOfMessage: {2}", + receiveResult.MessageType.ToString(), receiveResult.Count, receiveResult.EndOfMessage); + var truncBuffer = new ArraySegment(buffer.Array, 0, receiveResult.Count); incomingMessage.Add(truncBuffer); totalBytes += receiveResult.Count; @@ -106,6 +118,7 @@ namespace Microsoft.AspNetCore.Sockets.Client message = new Message(buffer, messageType, receiveResult.EndOfMessage); } + _logger.LogInformation("Passing message to application. Payload size: {0}", message.Payload.Length); while (await _application.Output.WaitToWriteAsync(cancellationToken)) { if (_application.Output.TryWrite(message)) @@ -115,37 +128,46 @@ namespace Microsoft.AspNetCore.Sockets.Client } } } + + _logger.LogInformation("Receive loop stopped"); } private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken) { + _logger.LogInformation("Starting the send loop"); + while (await _application.Input.WaitToReadAsync(cancellationToken)) { while (_application.Input.TryRead(out SendMessage message)) { try { + _logger.LogDebug("Received message from application. Message type {0}. Payload size: {1}", + message.Type, message.Payload.Length); + await _webSocket.SendAsync(new ArraySegment(message.Payload), - message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, true, - cancellationToken); + message.Type == MessageType.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary, + true, cancellationToken); + message.SendResult.SetResult(null); } - catch (OperationCanceledException ex) + catch (OperationCanceledException) { - _logger?.LogError(ex.Message); message.SendResult.SetCanceled(); await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); break; } catch (Exception ex) { - _logger?.LogError(ex.Message); + _logger.LogError(ex.Message); message.SendResult.SetException(ex); await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); throw; } } } + + _logger.LogInformation("Send loop stopped"); } private async Task Connect(Uri url) @@ -167,6 +189,8 @@ namespace Microsoft.AspNetCore.Sockets.Client public async Task StopAsync() { + _logger.LogInformation("Transport {0} is stopping", nameof(WebSocketsTransport)); + await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken); _webSocket.Dispose(); @@ -178,6 +202,8 @@ namespace Microsoft.AspNetCore.Sockets.Client { // exceptions have been handled in the Running task continuation by closing the channel with the exception } + + _logger.LogInformation("Transport {0} stopped", nameof(WebSocketsTransport)); } } }