Add connection ID scope to HubConnection (#2394)

This commit is contained in:
James Newton-King 2018-06-01 11:12:51 +12:00 committed by GitHub
parent f309c303db
commit 4f85ca2b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 331 additions and 123 deletions

View File

@ -46,7 +46,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
private readonly ITransportFactory _transportFactory;
private string _connectionId;
private readonly ConnectionLogScope _logScope;
private readonly IDisposable _scopeDisposable;
private readonly ILoggerFactory _loggerFactory;
private Func<Task<string>> _accessTokenProvider;
@ -150,7 +149,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
_transportFactory = new DefaultTransportFactory(httpConnectionOptions.Transports, _loggerFactory, _httpClient, httpConnectionOptions, GetAccessTokenAsync);
_logScope = new ConnectionLogScope();
_scopeDisposable = _logger.BeginScope(_logScope);
Features.Set<IConnectionInherentKeepAliveFeature>(this);
}
@ -188,7 +186,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
/// </remarks>
public async Task StartAsync(TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
await StartAsyncCore(transferFormat).ForceAsync();
using (_logger.BeginScope(_logScope))
{
await StartAsyncCore(transferFormat).ForceAsync();
}
}
private async Task StartAsyncCore(TransferFormat transferFormat)
@ -233,7 +234,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
/// A connection cannot be restarted after it has stopped. To restart a connection
/// a new instance should be created using the same options.
/// </remarks>
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
public async Task DisposeAsync()
{
using (_logger.BeginScope(_logScope))
{
await DisposeAsyncCore().ForceAsync();
}
}
private async Task DisposeAsyncCore()
{
@ -274,7 +281,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
// We want to do these things even if the WaitForWriterToComplete/WaitForReaderToComplete fails
if (!_disposed)
{
_scopeDisposable.Dispose();
_disposed = true;
}

View File

@ -10,6 +10,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
internal class ConnectionLogScope : IReadOnlyList<KeyValuePair<string, object>>
{
// Name chosen so as not to collide with Kestrel's "ConnectionId"
private const string ClientConnectionIdKey = "ClientConnectionId";
private string _cachedToString;
private string _connectionId;
@ -29,7 +32,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
if (Count == 1 && index == 0)
{
return new KeyValuePair<string, object>("ClientConnectionId", ConnectionId);
return new KeyValuePair<string, object>(ClientConnectionIdKey, ConnectionId);
}
throw new ArgumentOutOfRangeException(nameof(index));
@ -57,14 +60,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
if (!string.IsNullOrEmpty(ConnectionId))
{
_cachedToString = string.Format(
CultureInfo.InvariantCulture,
"ClientConnectionId:{0}",
ConnectionId);
_cachedToString = FormattableString.Invariant($"{ClientConnectionIdKey}:{ConnectionId}");
}
}
return _cachedToString;
return _cachedToString ?? string.Empty;
}
}
}

View File

@ -49,6 +49,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
private long _nextActivationSendPing;
private bool _disposed;
private readonly ConnectionLogScope _logScope;
// Transient state to a connection
private ConnectionState _connectionState;
@ -125,6 +127,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_logScope = new ConnectionLogScope();
}
/// <summary>
@ -135,7 +139,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
public async Task StartAsync(CancellationToken cancellationToken = default)
{
CheckDisposed();
await StartAsyncCore(cancellationToken).ForceAsync();
using (_logger.BeginScope(_logScope))
{
await StartAsyncCore(cancellationToken).ForceAsync();
}
}
/// <summary>
@ -146,7 +153,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
public async Task StopAsync(CancellationToken cancellationToken = default)
{
CheckDisposed();
await StopAsyncCore(disposing: false).ForceAsync();
using (_logger.BeginScope(_logScope))
{
await StopAsyncCore(disposing: false).ForceAsync();
}
}
// Current plan for IAsyncDisposable is that DisposeAsync will NOT take a CancellationToken
@ -159,7 +169,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
if (!_disposed)
{
await StopAsyncCore(disposing: true).ForceAsync();
using (_logger.BeginScope(_logScope))
{
await StopAsyncCore(disposing: true).ForceAsync();
}
}
}
@ -221,8 +234,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
/// <remarks>
/// This is a low level method for invoking a streaming hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>StreamAsChannelAsync</c> extension method is recommended.
/// </remarks>
public async Task<ChannelReader<object>> StreamAsChannelCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
public async Task<ChannelReader<object>> StreamAsChannelCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
{
using (_logger.BeginScope(_logScope))
{
return await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
}
}
/// <summary>
/// Invokes a hub method on the server using the specified method name, return type and arguments.
@ -238,8 +256,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
/// <remarks>
/// This is a low level method for invoking a hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>InvokeAsync</c> extension method is recommended.
/// </remarks>
public async Task<object> InvokeCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await InvokeCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
public async Task<object> InvokeCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
{
using (_logger.BeginScope(_logScope))
{
return await InvokeCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
}
}
/// <summary>
/// Invokes a hub method on the server using the specified method name and arguments.
@ -252,8 +275,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
/// <remarks>
/// This is a low level method for invoking a hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>SendAsync</c> extension method is recommended.
/// </remarks>
public async Task SendCoreAsync(string methodName, object[] args, CancellationToken cancellationToken = default) =>
await SendCoreAsyncCore(methodName, args, cancellationToken).ForceAsync();
public async Task SendCoreAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
using (_logger.BeginScope(_logScope))
{
await SendCoreAsyncCore(methodName, args, cancellationToken).ForceAsync();
}
}
private async Task StartAsyncCore(CancellationToken cancellationToken)
{
@ -1086,6 +1114,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
{
_hubConnection = hubConnection;
_hubConnection._logScope.ConnectionId = connection.ConnectionId;
Connection = connection;
}
@ -1181,6 +1210,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
await ReceiveTask;
Log.Stopped(_hubConnection._logger);
_hubConnection._logScope.ConnectionId = null;
_stopTcs.TrySetResult(null);
}

View File

@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
namespace Microsoft.AspNetCore.SignalR.Client.Internal
{
internal class ConnectionLogScope : IReadOnlyList<KeyValuePair<string, object>>
{
// Name chosen so as not to collide with Kestrel's "ConnectionId"
private const string ClientConnectionIdKey = "ClientConnectionId";
private string _cachedToString;
private string _connectionId;
public string ConnectionId
{
get => _connectionId;
set
{
_cachedToString = null;
_connectionId = value;
}
}
public KeyValuePair<string, object> this[int index]
{
get
{
if (Count == 1 && index == 0)
{
return new KeyValuePair<string, object>(ClientConnectionIdKey, ConnectionId);
}
throw new ArgumentOutOfRangeException(nameof(index));
}
}
public int Count => string.IsNullOrEmpty(ConnectionId) ? 0 : 1;
public IEnumerator<KeyValuePair<string, object>> GetEnumerator()
{
for (var i = 0; i < Count; ++i)
{
yield return this[i];
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public override string ToString()
{
if (_cachedToString == null)
{
if (!string.IsNullOrEmpty(ConnectionId))
{
_cachedToString = FormattableString.Invariant($"{ClientConnectionIdKey}:{ConnectionId}");
}
}
return _cachedToString ?? string.Empty;
}
}
}

View File

@ -0,0 +1,77 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
namespace Microsoft.AspNetCore.SignalR.Tests
{
// TestSink does not have an event
internal class LogSinkProvider : ILoggerProvider
{
private readonly ConcurrentQueue<LogRecord> _logs = new ConcurrentQueue<LogRecord>();
public event Action<LogRecord> RecordLogged;
public ILogger CreateLogger(string categoryName)
{
return new LogSinkLogger(categoryName, this);
}
public void Dispose()
{
}
public IList<LogRecord> GetLogs() => _logs.ToList();
public void Log<TState>(string categoryName, LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
var record = new LogRecord(
DateTime.Now,
new WriteContext
{
LoggerName = categoryName,
LogLevel = logLevel,
EventId = eventId,
State = state,
Exception = exception,
Formatter = (o, e) => formatter((TState)o, e),
});
_logs.Enqueue(record);
RecordLogged?.Invoke(record);
}
private class LogSinkLogger : ILogger
{
private readonly string _categoryName;
private readonly LogSinkProvider _logSinkProvider;
public LogSinkLogger(string categoryName, LogSinkProvider logSinkProvider)
{
_categoryName = categoryName;
_logSinkProvider = logSinkProvider;
}
public IDisposable BeginScope<TState>(TState state)
{
return null;
}
public bool IsEnabled(LogLevel logLevel)
{
return true;
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
_logSinkProvider.Log(_categoryName, logLevel, eventId, state, exception, formatter);
}
}
}
}

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@ -71,6 +70,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
_loggerFactory = loggerFactory;
}
_loggerFactory = new WrappingLoggerFactory(_loggerFactory);
_loggerFactory.AddProvider(_logSinkProvider);
_logger = _loggerFactory.CreateLogger<ServerFixture<TStartup>>();
StartServer();
@ -84,7 +85,6 @@ namespace Microsoft.AspNetCore.SignalR.Tests
_host = new WebHostBuilder()
.ConfigureLogging(builder => builder
.SetMinimumLevel(LogLevel.Trace)
.AddProvider(_logSinkProvider)
.AddProvider(new ForwardingLoggerProvider(_loggerFactory)))
.UseStartup(typeof(TStartup))
.UseKestrel()
@ -164,103 +164,4 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
}
// TestSink doesn't seem to be thread-safe :(.
internal class LogSinkProvider : ILoggerProvider, ISupportExternalScope
{
private readonly ConcurrentQueue<LogRecord> _logs = new ConcurrentQueue<LogRecord>();
internal IExternalScopeProvider _scopeProvider;
public event Action<LogRecord> RecordLogged;
public ILogger CreateLogger(string categoryName)
{
return new LogSinkLogger(categoryName, this);
}
public void Dispose()
{
}
public IList<LogRecord> GetLogs() => _logs.ToList();
public void Log<TState>(string categoryName, LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
var record = new LogRecord(
DateTime.Now,
new WriteContext
{
LoggerName = categoryName,
LogLevel = logLevel,
EventId = eventId,
State = state,
Exception = exception,
Formatter = (o, e) => formatter((TState)o, e),
});
_logs.Enqueue(record);
RecordLogged?.Invoke(record);
}
private class LogSinkLogger : ILogger
{
private readonly string _categoryName;
private readonly LogSinkProvider _logSinkProvider;
public LogSinkLogger(string categoryName, LogSinkProvider logSinkProvider)
{
_categoryName = categoryName;
_logSinkProvider = logSinkProvider;
}
public IDisposable BeginScope<TState>(TState state)
{
return null;
}
public bool IsEnabled(LogLevel logLevel)
{
return true;
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
// Build the message outside of the formatter
// Serilog doesn't appear to use the formatter and just writes the state
var connectionId = GetConnectionId();
var sb = new StringBuilder();
if (connectionId != null)
{
sb.Append(connectionId + " - ");
}
sb.Append(formatter(state, exception));
var message = sb.ToString();
_logSinkProvider.Log(_categoryName, logLevel, eventId, message, exception, (s, ex) => s);
}
private string GetConnectionId()
{
string connectionId = null;
_logSinkProvider._scopeProvider.ForEachScope<object>((scope, s) =>
{
if (scope is IReadOnlyList<KeyValuePair<string, object>> logScope)
{
var id = logScope.FirstOrDefault(kv => kv.Key == "TransportConnectionId").Value as string;
if (id != null)
{
connectionId = id;
}
}
}, null);
return connectionId;
}
}
public void SetScopeProvider(IExternalScopeProvider scopeProvider)
{
_scopeProvider = scopeProvider;
}
}
}

View File

@ -19,14 +19,20 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
var disposable = StartLog(out loggerFactory, testName);
return new VerifyNoErrorsScope(loggerFactory, disposable, expectedErrorsFilter);
return CreateScope(ref loggerFactory, disposable, expectedErrorsFilter);
}
public virtual IDisposable StartVerifiableLog(out ILoggerFactory loggerFactory, LogLevel minLogLevel, [CallerMemberName] string testName = null, Func<WriteContext, bool> expectedErrorsFilter = null)
{
var disposable = StartLog(out loggerFactory, minLogLevel, testName);
return new VerifyNoErrorsScope(loggerFactory, disposable, expectedErrorsFilter);
return CreateScope(ref loggerFactory, disposable, expectedErrorsFilter);
}
private VerifyNoErrorsScope CreateScope(ref ILoggerFactory loggerFactory, IDisposable wrappedDisposable = null, Func<WriteContext, bool> expectedErrorsFilter = null)
{
loggerFactory = new WrappingLoggerFactory(loggerFactory ?? new LoggerFactory());
return new VerifyNoErrorsScope(loggerFactory, wrappedDisposable, expectedErrorsFilter);
}
}
}

View File

@ -0,0 +1,117 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Tests
{
/// <summary>
/// A logger factory that will prepend the current SignalR connection ID to the message.
/// </summary>
public class WrappingLoggerFactory : ILoggerFactory
{
private readonly ILoggerFactory _innerLoggerFactory;
private readonly DummyProvider _provider;
public WrappingLoggerFactory(ILoggerFactory innerLoggerFactory)
{
_innerLoggerFactory = innerLoggerFactory;
_provider = new DummyProvider();
AddProvider(_provider);
}
public void Dispose()
{
_innerLoggerFactory.Dispose();
}
public ILogger CreateLogger(string categoryName)
{
return new WrappingLogger(_provider, _innerLoggerFactory.CreateLogger(categoryName));
}
public void AddProvider(ILoggerProvider provider)
{
_innerLoggerFactory.AddProvider(provider);
}
private class DummyProvider : ILoggerProvider, ISupportExternalScope
{
public IExternalScopeProvider ScopeProvider { get; private set; }
public void Dispose()
{
}
public ILogger CreateLogger(string categoryName)
{
return NullLogger.Instance;
}
public void SetScopeProvider(IExternalScopeProvider scopeProvider)
{
ScopeProvider = scopeProvider;
}
}
private class WrappingLogger : ILogger
{
private readonly DummyProvider _provider;
private readonly ILogger _logger;
public WrappingLogger(DummyProvider provider, ILogger logger)
{
_provider = provider;
_logger = logger;
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
// Build the message outside of the formatter
// Serilog doesn't appear to use the formatter and just writes the state
var connectionId = GetConnectionId();
var sb = new StringBuilder();
if (connectionId != null)
{
sb.Append(connectionId + " - ");
}
sb.Append(formatter(state, exception));
var message = sb.ToString();
_logger.Log(logLevel, eventId, message, exception, (s, ex) => s);
}
public bool IsEnabled(LogLevel logLevel)
{
return _logger.IsEnabled(logLevel);
}
public IDisposable BeginScope<TState>(TState state)
{
return _logger.BeginScope(state);
}
private string GetConnectionId()
{
string connectionId = null;
_provider.ScopeProvider?.ForEachScope<object>((scope, s) =>
{
if (scope is IReadOnlyList<KeyValuePair<string, object>> logScope)
{
if (logScope.FirstOrDefault(kv => kv.Key == "TransportConnectionId" || kv.Key == "ClientConnectionId").Value is string id)
{
connectionId = id;
}
}
}, null);
return connectionId;
}
}
}
}