Reorganize and add more logging to Sockets.Http (#549)

* Move Sockets.Transports to internal namespace
This commit is contained in:
BrennanConroy 2017-06-15 08:24:43 -07:00 committed by GitHub
parent ec02907af1
commit 0298868c00
10 changed files with 388 additions and 59 deletions

View File

@ -9,7 +9,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.AspNetCore.Sockets.Transports;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
@ -75,6 +75,8 @@ namespace Microsoft.AspNetCore.Sockets
return;
}
_logger.EstablishedConnection(connection.ConnectionId, context.TraceIdentifier);
if (!await EnsureConnectionStateAsync(connection, context, TransportType.ServerSentEvents, supportedTransports))
{
// Bad connection state. It's already set the response status code.
@ -82,7 +84,7 @@ namespace Microsoft.AspNetCore.Sockets
}
// We only need to provide the Input channel since writing to the application is handled through /send.
var sse = new ServerSentEventsTransport(connection.Application.Input, _loggerFactory);
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
await DoPersistentConnection(socketDelegate, sse, context, connection);
}
@ -96,13 +98,15 @@ namespace Microsoft.AspNetCore.Sockets
return;
}
_logger.EstablishedConnection(connection.ConnectionId, context.TraceIdentifier);
if (!await EnsureConnectionStateAsync(connection, context, TransportType.WebSockets, supportedTransports))
{
// Bad connection state. It's already set the response status code.
return;
}
var ws = new WebSocketsTransport(options.WebSockets, connection.Application, _loggerFactory);
var ws = new WebSocketsTransport(options.WebSockets, connection.Application, connection.ConnectionId, _loggerFactory);
await DoPersistentConnection(socketDelegate, ws, context, connection);
}
@ -130,7 +134,7 @@ namespace Microsoft.AspNetCore.Sockets
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Disposed)
{
_logger.LogDebug("Connection {connectionId} was disposed,", connection.ConnectionId);
_logger.ConnectionDisposed(connection.ConnectionId);
// The connection was disposed
context.Response.StatusCode = StatusCodes.Status404NotFound;
@ -140,8 +144,7 @@ namespace Microsoft.AspNetCore.Sockets
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
{
var existing = connection.GetHttpContext();
_logger.LogDebug("Connection {connectionId} is already active via {requestId}. Cancelling previous request.", connection.ConnectionId, existing.TraceIdentifier);
_logger.ConnectionAlreadyActive(connection.ConnectionId, existing.TraceIdentifier);
using (connection.Cancellation)
{
@ -151,7 +154,7 @@ namespace Microsoft.AspNetCore.Sockets
// Wait for the previous request to drain
await connection.TransportTask;
_logger.LogDebug("Previous poll cancelled for {connectionId} on {requestId}.", connection.ConnectionId, existing.TraceIdentifier);
_logger.PollCanceled(connection.ConnectionId, existing.TraceIdentifier);
}
}
@ -161,7 +164,7 @@ namespace Microsoft.AspNetCore.Sockets
// Raise OnConnected for new connections only since polls happen all the time
if (connection.ApplicationTask == null)
{
_logger.LogDebug("Establishing new connection: {connectionId} on {requestId}", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
_logger.EstablishedConnection(connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
connection.Metadata[ConnectionMetadataNames.Transport] = TransportType.LongPolling;
@ -169,7 +172,7 @@ namespace Microsoft.AspNetCore.Sockets
}
else
{
_logger.LogDebug("Resuming existing connection: {connectionId} on {requestId}", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
_logger.ResumingConnection(connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
}
// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
@ -182,7 +185,7 @@ namespace Microsoft.AspNetCore.Sockets
context.Response.RegisterForDispose(timeoutSource);
context.Response.RegisterForDispose(tokenSource);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, connection.ConnectionId, _loggerFactory);
// Start the transport
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
@ -266,7 +269,7 @@ namespace Microsoft.AspNetCore.Sockets
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Disposed)
{
_logger.LogDebug("Connection {connectionId} was disposed,", connection.ConnectionId);
_logger.ConnectionDisposed(connection.ConnectionId);
// Connection was disposed
context.Response.StatusCode = StatusCodes.Status404NotFound;
@ -276,7 +279,7 @@ namespace Microsoft.AspNetCore.Sockets
// There's already an active request
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
{
_logger.LogDebug("Connection {connectionId} is already active via {requestId}.", connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
_logger.ConnectionAlreadyActive(connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
// Reject the request with a 409 conflict
context.Response.StatusCode = StatusCodes.Status409Conflict;
@ -326,6 +329,8 @@ namespace Microsoft.AspNetCore.Sockets
// Get the bytes for the connection id
var negotiateResponseBuffer = Encoding.UTF8.GetBytes(GetNegotiatePayload(connection.ConnectionId, options));
_logger.NegotiationRequest(connection.ConnectionId);
// Write it out to the response with the right content length
context.Response.ContentLength = negotiateResponseBuffer.Length;
return context.Response.Body.WriteAsync(negotiateResponseBuffer, 0, negotiateResponseBuffer.Length);
@ -379,6 +384,7 @@ namespace Microsoft.AspNetCore.Sockets
buffer = stream.ToArray();
}
_logger.ReceivedBytes(connection.ConnectionId, buffer.Length);
while (!connection.Application.Output.TryWrite(buffer))
{
if (!await connection.Application.Output.WaitToWriteAsync())
@ -393,6 +399,7 @@ namespace Microsoft.AspNetCore.Sockets
if ((supportedTransports & transportType) == 0)
{
context.Response.StatusCode = StatusCodes.Status404NotFound;
_logger.TransportNotSupported(connection.ConnectionId, transportType);
await context.Response.WriteAsync($"{transportType} transport not supported by this end point type");
return false;
}
@ -406,6 +413,7 @@ namespace Microsoft.AspNetCore.Sockets
else if (transport != transportType)
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
_logger.CannotChangeTransport(connection.ConnectionId, transport.Value, transportType);
await context.Response.WriteAsync("Cannot change transports mid-connection");
return false;
}

View File

@ -0,0 +1,313 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net.WebSockets;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Internal
{
public static class SocketLoggerExtensions
{
// Category: LongPollingTransport
private static readonly Action<ILogger, DateTime, string, string, Exception> _longPolling204 =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Information, 0, "{time}: Connection Id {connectionId}, Request Id {requestId}: Terminating Long Polling connection by sending 204 response.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _pollTimedOut =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Information, 1, "{time}: Connection Id {connectionId}, Request Id {requestId}: Poll request timed out. Sending 200 response to connection.");
private static readonly Action<ILogger, DateTime, string, string, int, Exception> _longPollingWritingMessage =
LoggerMessage.Define<DateTime, string, string, int>(LogLevel.Debug, 2, "{time}: Connection Id {connectionId}, Request Id {requestId}: Writing a {count} byte message to connection.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _longPollingDisconnected =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Debug, 3, "{time}: Connection Id {connectionId}, Request Id {requestId}: Client disconnected from Long Polling endpoint for connection.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _longPollingTerminated =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, 4, "{time}: Connection Id {connectionId}, Request Id {requestId}: Long Polling transport was terminated due to an error on connection.");
// Category: HttpConnectionDispatcher
private static readonly Action<ILogger, DateTime, string, Exception> _connectionDisposed =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 0, "{time}: Connection Id {connectionId} was disposed.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _connectionAlreadyActive =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Debug, 1, "{time}: Connection Id {connectionId} is already active via {requestId}.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _pollCanceled =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Debug, 2, "{time}: Previous poll canceled for {connectionId} on {requestId}.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _establishedConnection =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Debug, 3, "{time}: Connection Id {connectionId}, Request Id {requestId}: Establishing new connection.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _resumingConnection =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Debug, 4, "{time}: Connection Id {connectionId}, Request Id {requestId}: Resuming existing connection.");
private static readonly Action<ILogger, DateTime, string, int, Exception> _receivedBytes =
LoggerMessage.Define<DateTime, string, int>(LogLevel.Debug, 5, "{time}: Connection Id {connectionId}: Received {count} bytes.");
private static readonly Action<ILogger, DateTime, string, TransportType, Exception> _transportNotSupported =
LoggerMessage.Define<DateTime, string, TransportType>(LogLevel.Debug, 6, "{time}: Connection Id {connectionId}: {transportType} transport not supported by this endpoint type.");
private static readonly Action<ILogger, DateTime, string, TransportType, TransportType, Exception> _cannotChangeTransport =
LoggerMessage.Define<DateTime, string, TransportType, TransportType>(LogLevel.Debug, 7, "{time}: Connection Id {connectionId}: Cannot change transports mid-connection; currently using {transportType}, requesting {requestedTransport}.");
private static readonly Action<ILogger, DateTime, string, Exception> _negotiationRequest =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 8, "{time}: Connection Id {connectionId}: Sending negotiation response.");
// Category: WebSocketsTransport
private static readonly Action<ILogger, DateTime, string, Exception> _socketOpened =
LoggerMessage.Define<DateTime, string>(LogLevel.Information, 0, "{time}: Connection Id {connectionId}: Socket opened.");
private static readonly Action<ILogger, DateTime, string, Exception> _socketClosed =
LoggerMessage.Define<DateTime, string>(LogLevel.Information, 1, "{time}: Connection Id {connectionId}: Socket closed.");
private static readonly Action<ILogger, DateTime, string, WebSocketCloseStatus?, string, Exception> _clientClosed =
LoggerMessage.Define<DateTime, string, WebSocketCloseStatus?, string>(LogLevel.Debug, 2, "{time}: Connection Id {connectionId}: Client closed connection with status code '{status}' ({description}). Signaling end-of-input to application..");
private static readonly Action<ILogger, DateTime, string, Exception> _waitingForSend =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 3, "{time}: Connection Id {connectionId}: Waiting for the application to finish sending data.");
private static readonly Action<ILogger, DateTime, string, Exception> _failedSending =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 4, "{time}: Connection Id {connectionId}: Application failed during sending. Sending InternalServerError close frame.");
private static readonly Action<ILogger, DateTime, string, Exception> _finishedSending =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 5, "{time}: Connection Id {connectionId}: Application finished sending. Sending close frame.");
private static readonly Action<ILogger, DateTime, string, Exception> _waitingForClose =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 6, "{time}: Connection Id {connectionId}: Waiting for the client to close the socket.");
private static readonly Action<ILogger, DateTime, string, Exception> _closeTimedOut =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, 7, "{time}: Connection Id {connectionId}: Timed out waiting for client to send the close frame, aborting the connection.");
private static readonly Action<ILogger, DateTime, string, WebSocketMessageType, int, bool, Exception> _messageReceived =
LoggerMessage.Define<DateTime, string, WebSocketMessageType, int, bool>(LogLevel.Debug, 8, "{time}: Connection Id {connectionId}: Message received. Type: {messageType}, size: {size}, EndOfMessage: {endOfMessage}.");
private static readonly Action<ILogger, DateTime, string, int, Exception> _messageToApplication =
LoggerMessage.Define<DateTime, string, int>(LogLevel.Debug, 9, "{time}: Connection Id {connectionId}: Passing message to application. Payload size: {size}.");
private static readonly Action<ILogger, DateTime, string, int, Exception> _sendPayload =
LoggerMessage.Define<DateTime, string, int>(LogLevel.Debug, 10, "{time}: Connection Id {connectionId}: Sending payload: {size} bytes.");
private static readonly Action<ILogger, DateTime, string, Exception> _errorWritingFrame =
LoggerMessage.Define<DateTime, string>(LogLevel.Error, 11, "{time}: Connection Id {connectionId}: Error writing frame.");
// Category: ServerSentEventsTransport
private static readonly Action<ILogger, DateTime, string, int, Exception> _sseWritingMessage =
LoggerMessage.Define<DateTime, string, int>(LogLevel.Debug, 0, "{time}: Connection Id {connectionId}: Writing a {count} byte message.");
public static void LongPolling204(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Information))
{
_longPolling204(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void PollTimedOut(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Information))
{
_pollTimedOut(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void LongPollingWritingMessage(this ILogger logger, string connectionId, string requestId, int count)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_longPollingWritingMessage(logger, DateTime.Now, connectionId, requestId, count, null);
}
}
public static void LongPollingDisconnected(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_longPollingDisconnected(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void LongPollingTerminated(this ILogger logger, string connectionId, string requestId, Exception ex)
{
if (logger.IsEnabled(LogLevel.Error))
{
_longPollingTerminated(logger, DateTime.Now, connectionId, requestId, ex);
}
}
public static void ConnectionDisposed(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_connectionDisposed(logger, DateTime.Now, connectionId, null);
}
}
public static void ConnectionAlreadyActive(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_connectionAlreadyActive(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void PollCanceled(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_pollCanceled(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void EstablishedConnection(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_establishedConnection(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void ResumingConnection(this ILogger logger, string connectionId, string requestId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_resumingConnection(logger, DateTime.Now, connectionId, requestId, null);
}
}
public static void ReceivedBytes(this ILogger logger, string connectionId, int count)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_receivedBytes(logger, DateTime.Now, connectionId, count, null);
}
}
public static void TransportNotSupported(this ILogger logger, string connectionId, TransportType transport)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_transportNotSupported(logger, DateTime.Now, connectionId, transport, null);
}
}
public static void CannotChangeTransport(this ILogger logger, string connectionId, TransportType transport, TransportType requestTransport)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_cannotChangeTransport(logger, DateTime.Now, connectionId, transport, requestTransport, null);
}
}
public static void NegotiationRequest(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_negotiationRequest(logger, DateTime.Now, connectionId, null);
}
}
public static void SocketOpened(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Information))
{
_socketOpened(logger, DateTime.Now, connectionId, null);
}
}
public static void SocketClosed(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Information))
{
_socketClosed(logger, DateTime.Now, connectionId, null);
}
}
public static void ClientClosed(this ILogger logger, string connectionId, WebSocketCloseStatus? closeStatus, string closeDescription)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_clientClosed(logger, DateTime.Now, connectionId, closeStatus, closeDescription, null);
}
}
public static void WaitingForSend(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_waitingForSend(logger, DateTime.Now, connectionId, null);
}
}
public static void FailedSending(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_failedSending(logger, DateTime.Now, connectionId, null);
}
}
public static void FinishedSending(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_finishedSending(logger, DateTime.Now, connectionId, null);
}
}
public static void WaitingForClose(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_waitingForClose(logger, DateTime.Now, connectionId, null);
}
}
public static void CloseTimedOut(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_closeTimedOut(logger, DateTime.Now, connectionId, null);
}
}
public static void MessageReceived(this ILogger logger, string connectionId, WebSocketMessageType type, int size, bool endOfMessage)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_messageReceived(logger, DateTime.Now, connectionId, type, size, endOfMessage, null);
}
}
public static void MessageToApplication(this ILogger logger, string connectionId, int size)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_messageToApplication(logger, DateTime.Now, connectionId, size, null);
}
}
public static void SendPayload(this ILogger logger, string connectionId, int size)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_sendPayload(logger, DateTime.Now, connectionId, size, null);
}
}
public static void ErrorWritingFrame(this ILogger logger, string connectionId, Exception ex)
{
if (logger.IsEnabled(LogLevel.Error))
{
_errorWritingFrame(logger, DateTime.Now, connectionId, ex);
}
}
public static void SSEWritingMessage(this ILogger logger, string connectionId, int count)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_sseWritingMessage(logger, DateTime.Now, connectionId, count, null);
}
}
}
}

View File

@ -5,7 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
namespace Microsoft.AspNetCore.Sockets.Transports
namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public interface IHttpTransport
{

View File

@ -9,18 +9,20 @@ using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Transports
namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public class LongPollingTransport : IHttpTransport
{
private readonly ReadableChannel<byte[]> _application;
private readonly ILogger _logger;
private readonly CancellationToken _timeoutToken;
private readonly string _connectionId;
public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, ILoggerFactory loggerFactory)
public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
_timeoutToken = timeoutToken;
_application = application;
_connectionId = connectionId;
_logger = loggerFactory.CreateLogger<LongPollingTransport>();
}
@ -31,7 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
if (!await _application.WaitToReadAsync(token))
{
await _application.Completion;
_logger.LogInformation("Terminating Long Polling connection by sending 204 response.");
_logger.LongPolling204(_connectionId, context.TraceIdentifier);
context.Response.StatusCode = StatusCodes.Status204NoContent;
return;
}
@ -47,7 +49,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
contentLength += buffer.Length;
buffers.Add(buffer);
_logger.LogDebug("Writing {0} byte message to response", buffer.Length);
_logger.LongPollingWritingMessage(_connectionId, context.TraceIdentifier, buffer.Length);
}
context.Response.ContentLength = contentLength;
@ -69,12 +71,12 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
// Don't count this as cancellation, this is normal as the poll can end due to the browser closing.
// The background thread will eventually dispose this connection if it's inactive
_logger.LogDebug("Client disconnected from Long Polling endpoint.");
_logger.LongPollingDisconnected(_connectionId, context.TraceIdentifier);
}
// Case 2
else if (_timeoutToken.IsCancellationRequested)
{
_logger.LogInformation("Poll request timed out. Sending 200 response.");
_logger.PollTimedOut(_connectionId, context.TraceIdentifier);
context.Response.ContentLength = 0;
context.Response.StatusCode = StatusCodes.Status200OK;
@ -82,13 +84,13 @@ namespace Microsoft.AspNetCore.Sockets.Transports
else
{
// Case 3
_logger.LogInformation("Terminating Long Polling connection by sending 204 response.");
_logger.LongPolling204(_connectionId, context.TraceIdentifier);
context.Response.StatusCode = StatusCodes.Status204NoContent;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Long Polling transport was terminated due to an error");
_logger.LongPollingTerminated(_connectionId, context.TraceIdentifier, ex);
throw;
}
}

View File

@ -13,16 +13,18 @@ using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Transports
namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public class ServerSentEventsTransport : IHttpTransport
{
private readonly ReadableChannel<byte[]> _application;
private readonly string _connectionId;
private readonly ILogger _logger;
public ServerSentEventsTransport(ReadableChannel<byte[]> application, ILoggerFactory loggerFactory)
public ServerSentEventsTransport(ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
_application = application;
_connectionId = connectionId;
_logger = loggerFactory.CreateLogger<ServerSentEventsTransport>();
}
@ -51,6 +53,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
while (_application.TryRead(out var buffer))
{
_logger.SSEWritingMessage(_connectionId, buffer.Length);
if (!ServerSentEventsMessageFormatter.TryWriteMessage(buffer, output))
{
// We ran out of space to write, even after trying to enlarge.

View File

@ -10,15 +10,16 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Transports
namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public class WebSocketsTransport : IHttpTransport
{
private readonly WebSocketOptions _options;
private readonly ILogger _logger;
private readonly IChannelConnection<byte[]> _application;
private readonly string _connectionId;
public WebSocketsTransport(WebSocketOptions options, IChannelConnection<byte[]> application, ILoggerFactory loggerFactory)
public WebSocketsTransport(WebSocketOptions options, IChannelConnection<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
if (options == null)
{
@ -37,6 +38,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
_options = options;
_application = application;
_connectionId = connectionId;
_logger = loggerFactory.CreateLogger<WebSocketsTransport>();
}
@ -46,11 +48,11 @@ namespace Microsoft.AspNetCore.Sockets.Transports
using (var ws = await context.WebSockets.AcceptWebSocketAsync())
{
_logger.LogInformation("Socket opened.");
_logger.SocketOpened(_connectionId);
await ProcessSocketAsync(ws);
}
_logger.LogInformation("Socket closed.");
_logger.SocketClosed(_connectionId);
}
public async Task ProcessSocketAsync(WebSocket socket)
@ -79,11 +81,11 @@ namespace Microsoft.AspNetCore.Sockets.Transports
// Shutting down because we received a close frame from the client.
// Complete the input writer so that the application knows there won't be any more input.
_logger.LogDebug("Client closed connection with status code '{status}' ({description}). Signaling end-of-input to application", receiving.Result.CloseStatus, receiving.Result.CloseStatusDescription);
_logger.ClientClosed(_connectionId, receiving.Result.CloseStatus, receiving.Result.CloseStatusDescription);
_application.Output.TryComplete();
// Wait for the application to finish sending.
_logger.LogDebug("Waiting for the application to finish sending data");
_logger.WaitingForSend(_connectionId);
await sending;
// Send the server's close frame
@ -94,13 +96,20 @@ namespace Microsoft.AspNetCore.Sockets.Transports
var failed = sending.IsFaulted || _application.Input.Completion.IsFaulted;
// The application finished sending. Close our end of the connection
_logger.LogDebug(!failed ? "Application finished sending. Sending close frame." : "Application failed during sending. Sending InternalServerError close frame");
if (failed)
{
_logger.FailedSending(_connectionId);
}
else
{
_logger.FinishedSending(_connectionId);
}
await socket.CloseOutputAsync(!failed ? WebSocketCloseStatus.NormalClosure : WebSocketCloseStatus.InternalServerError, "", CancellationToken.None);
// Now trigger the exception from the application, if there was one.
sending.GetAwaiter().GetResult();
_logger.LogDebug("Waiting for the client to close the socket");
_logger.WaitingForClose(_connectionId);
// Wait for the client to close or wait for the close timeout
var resultTask = await Task.WhenAny(receiving, Task.Delay(_options.CloseTimeout));
@ -108,7 +117,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
// We timed out waiting for the transport to close so abort the connection so we don't attempt to write anything else
if (resultTask != receiving)
{
_logger.LogDebug("Timed out waiting for client to send the close frame, aborting the connection.");
_logger.CloseTimedOut(_connectionId);
socket.Abort();
}
@ -138,8 +147,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
return receiveResult;
}
_logger.LogDebug("Message received. Type: {messageType}, size: {size}, EndOfMessage: {endOfMessage}",
receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);
_logger.MessageReceived(_connectionId, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);
var truncBuffer = new ArraySegment<byte>(buffer.Array, 0, receiveResult.Count);
incomingMessage.Add(truncBuffer);
@ -169,7 +177,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
Buffer.BlockCopy(incomingMessage[0].Array, incomingMessage[0].Offset, messageBuffer, 0, incomingMessage[0].Count);
}
_logger.LogInformation("Passing message to application. Payload size: {length}", messageBuffer.Length);
_logger.MessageToApplication(_connectionId, messageBuffer.Length);
while (await _application.Output.WaitToWriteAsync())
{
if (_application.Output.TryWrite(messageBuffer))
@ -192,16 +200,13 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Sending payload: {size}", buffer.Length);
}
_logger.SendPayload(_connectionId, buffer.Length);
await ws.SendAsync(new ArraySegment<byte>(buffer), _options.WebSocketMessageType, endOfMessage: true, cancellationToken: CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError("Error writing frame to output: {0}", ex);
_logger.ErrorWritingFrame(_connectionId, ex);
break;
}
}

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Text;
using System.Threading;
@ -9,7 +8,7 @@ using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Transports;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
using Microsoft.Extensions.Logging;
using Xunit;
@ -22,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
@ -37,7 +36,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var timeoutToken = new CancellationToken(true);
var poll = new LongPollingTransport(timeoutToken, channel, new LoggerFactory());
var poll = new LongPollingTransport(timeoutToken, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken, context.RequestAborted))
{
@ -53,7 +52,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;
@ -73,7 +72,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(CancellationToken.None, channel, new LoggerFactory());
var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;

View File

@ -1,15 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Transports;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
using Microsoft.Extensions.Logging;
using Xunit;
@ -22,7 +20,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var sse = new ServerSentEventsTransport(channel, new LoggerFactory());
var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
@ -39,7 +37,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var context = new DefaultHttpContext();
var feature = new HttpBufferingFeature();
context.Features.Set<IHttpBufferingFeature>(feature);
var sse = new ServerSentEventsTransport(channel, new LoggerFactory());
var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
@ -56,7 +54,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
var channel = Channel.CreateUnbounded<byte[]>();
var context = new DefaultHttpContext();
var sse = new ServerSentEventsTransport(channel, new LoggerFactory());
var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
var ms = new MemoryStream();
context.Response.Body = ms;

View File

@ -9,7 +9,7 @@ using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.AspNetCore.Sockets.Transports;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
using Microsoft.Extensions.Logging;
using Xunit;
@ -30,7 +30,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
@ -74,7 +74,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
@ -111,7 +111,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide,
connectionId: string.Empty, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
@ -150,7 +151,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
@ -177,7 +178,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
@ -214,7 +215,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
CloseTimeout = TimeSpan.FromSeconds(1)
};
var ws = new WebSocketsTransport(options, transportSide, new LoggerFactory());
var ws = new WebSocketsTransport(options, transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it