Progress towards deleting Sockets.Abstractions (#1705)
* Progress towards deleting Sockets.Abstractions - Moved our custom DefaultConnectionContext to Sockets.Http and renamed it to HttpConnectionContext. - Renamed ConnectionManager to HttpConnectionManager - Use DefaultConnection in tests and benchmarks - Delete ConnectionMetadata
This commit is contained in:
parent
a9667c3fbd
commit
cb05ce4e0b
|
|
@ -4,6 +4,7 @@ using System.Threading;
|
|||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
using BenchmarkDotNet.Attributes;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.SignalR.Internal;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
|
|||
new Logger<DefaultHubDispatcher<TestHub>>(NullLoggerFactory.Instance));
|
||||
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new Sockets.DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Application, pair.Transport);
|
||||
var connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Application, pair.Transport);
|
||||
|
||||
_connectionContext = new NoErrorHubConnectionContext(connection, TimeSpan.Zero, NullLoggerFactory.Instance);
|
||||
|
||||
|
|
|
|||
|
|
@ -22,11 +22,11 @@
|
|||
<MicrosoftAspNetCoreHttpPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreHttpPackageVersion>
|
||||
<MicrosoftAspNetCoreIdentityEntityFrameworkCorePackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreIdentityEntityFrameworkCorePackageVersion>
|
||||
<MicrosoftAspNetCoreMvcPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreMvcPackageVersion>
|
||||
<MicrosoftAspNetCoreConnectionsAbstractionsPackageVersion>2.1.0-a-preview2-bedrock-renames-17598</MicrosoftAspNetCoreConnectionsAbstractionsPackageVersion>
|
||||
<MicrosoftAspNetCoreConnectionsAbstractionsPackageVersion>2.1.0-a-preview2-default-connection-context-17612</MicrosoftAspNetCoreConnectionsAbstractionsPackageVersion>
|
||||
<MicrosoftAspNetCoreRoutingPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreRoutingPackageVersion>
|
||||
<MicrosoftAspNetCoreServerIISIntegrationPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreServerIISIntegrationPackageVersion>
|
||||
<MicrosoftAspNetCoreServerIntegrationTestingPackageVersion>0.5.0-preview2-30355</MicrosoftAspNetCoreServerIntegrationTestingPackageVersion>
|
||||
<MicrosoftAspNetCoreServerKestrelPackageVersion>2.1.0-a-preview2-bedrock-renames-17598</MicrosoftAspNetCoreServerKestrelPackageVersion>
|
||||
<MicrosoftAspNetCoreServerKestrelPackageVersion>2.1.0-a-preview2-default-connection-context-17612</MicrosoftAspNetCoreServerKestrelPackageVersion>
|
||||
<MicrosoftAspNetCoreStaticFilesPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreStaticFilesPackageVersion>
|
||||
<MicrosoftAspNetCoreTestHostPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreTestHostPackageVersion>
|
||||
<MicrosoftAspNetCoreTestingPackageVersion>2.1.0-preview2-30355</MicrosoftAspNetCoreTestingPackageVersion>
|
||||
|
|
|
|||
|
|
@ -1,119 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets
|
||||
{
|
||||
internal class ConnectionMetadata : IDictionary<object, object>
|
||||
{
|
||||
public ConnectionMetadata()
|
||||
: this(new ConcurrentDictionary<object, object>())
|
||||
{
|
||||
}
|
||||
|
||||
public ConnectionMetadata(IDictionary<object, object> items)
|
||||
{
|
||||
Items = items;
|
||||
}
|
||||
|
||||
public IDictionary<object, object> Items { get; }
|
||||
|
||||
// Replace the indexer with one that returns null for missing values
|
||||
object IDictionary<object, object>.this[object key]
|
||||
{
|
||||
get
|
||||
{
|
||||
if (Items.TryGetValue(key, out var value))
|
||||
{
|
||||
return value;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
set { Items[key] = value; }
|
||||
}
|
||||
|
||||
void IDictionary<object, object>.Add(object key, object value)
|
||||
{
|
||||
Items.Add(key, value);
|
||||
}
|
||||
|
||||
bool IDictionary<object, object>.ContainsKey(object key)
|
||||
{
|
||||
return Items.ContainsKey(key);
|
||||
}
|
||||
|
||||
ICollection<object> IDictionary<object, object>.Keys
|
||||
{
|
||||
get { return Items.Keys; }
|
||||
}
|
||||
|
||||
bool IDictionary<object, object>.Remove(object key)
|
||||
{
|
||||
return Items.Remove(key);
|
||||
}
|
||||
|
||||
bool IDictionary<object, object>.TryGetValue(object key, out object value)
|
||||
{
|
||||
return Items.TryGetValue(key, out value);
|
||||
}
|
||||
|
||||
ICollection<object> IDictionary<object, object>.Values
|
||||
{
|
||||
get { return Items.Values; }
|
||||
}
|
||||
|
||||
void ICollection<KeyValuePair<object, object>>.Add(KeyValuePair<object, object> item)
|
||||
{
|
||||
Items.Add(item);
|
||||
}
|
||||
|
||||
void ICollection<KeyValuePair<object, object>>.Clear()
|
||||
{
|
||||
Items.Clear();
|
||||
}
|
||||
|
||||
bool ICollection<KeyValuePair<object, object>>.Contains(KeyValuePair<object, object> item)
|
||||
{
|
||||
return Items.Contains(item);
|
||||
}
|
||||
|
||||
void ICollection<KeyValuePair<object, object>>.CopyTo(KeyValuePair<object, object>[] array, int arrayIndex)
|
||||
{
|
||||
Items.CopyTo(array, arrayIndex);
|
||||
}
|
||||
|
||||
int ICollection<KeyValuePair<object, object>>.Count
|
||||
{
|
||||
get { return Items.Count; }
|
||||
}
|
||||
|
||||
bool ICollection<KeyValuePair<object, object>>.IsReadOnly
|
||||
{
|
||||
get { return Items.IsReadOnly; }
|
||||
}
|
||||
|
||||
bool ICollection<KeyValuePair<object, object>>.Remove(KeyValuePair<object, object> item)
|
||||
{
|
||||
object value;
|
||||
if (Items.TryGetValue(item.Key, out value) && Equals(item.Value, value))
|
||||
{
|
||||
return Items.Remove(item.Key);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
IEnumerator<KeyValuePair<object, object>> IEnumerable<KeyValuePair<object, object>>.GetEnumerator()
|
||||
{
|
||||
return Items.GetEnumerator();
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator()
|
||||
{
|
||||
return Items.GetEnumerator();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5,5 +5,7 @@
|
|||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<RootNamespace>Microsoft.AspNetCore.Sockets</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.AspNetCore.Connections.Abstractions" Version="$(MicrosoftAspNetCoreConnectionsAbstractionsPackageVersion)" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
services.AddRouting();
|
||||
services.AddAuthorizationPolicyEvaluator();
|
||||
services.TryAddSingleton<HttpConnectionDispatcher>();
|
||||
services.TryAddSingleton<ConnectionManager>();
|
||||
services.TryAddSingleton<HttpConnectionManager>();
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.IO.Pipelines;
|
||||
using System.Security.Claims;
|
||||
|
|
@ -10,17 +11,20 @@ using System.Threading.Tasks;
|
|||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Connections.Features;
|
||||
using Microsoft.AspNetCore.Sockets.Http.Features;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets
|
||||
{
|
||||
public class DefaultConnectionContext : ConnectionContext,
|
||||
IConnectionIdFeature,
|
||||
IConnectionItemsFeature,
|
||||
IConnectionTransportFeature,
|
||||
IApplicationTransportFeature,
|
||||
IConnectionUserFeature,
|
||||
IConnectionHeartbeatFeature,
|
||||
ITransferFormatFeature
|
||||
public class HttpConnectionContext : ConnectionContext,
|
||||
IConnectionIdFeature,
|
||||
IConnectionItemsFeature,
|
||||
IConnectionTransportFeature,
|
||||
IApplicationTransportFeature,
|
||||
IConnectionUserFeature,
|
||||
IConnectionHeartbeatFeature,
|
||||
ITransferFormatFeature,
|
||||
IHttpContextFeature
|
||||
{
|
||||
private object _heartbeatLock = new object();
|
||||
private List<(Action<object> handler, object state)> _heartbeatHandlers;
|
||||
|
|
@ -34,7 +38,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
/// The caller is expected to set the <see cref="Transport"/> and <see cref="Application"/> pipes manually.
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
public DefaultConnectionContext(string id)
|
||||
public HttpConnectionContext(string id)
|
||||
{
|
||||
ConnectionId = id;
|
||||
LastSeenUtc = DateTime.UtcNow;
|
||||
|
|
@ -52,9 +56,10 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
Features.Set<IApplicationTransportFeature>(this);
|
||||
Features.Set<IConnectionHeartbeatFeature>(this);
|
||||
Features.Set<ITransferFormatFeature>(this);
|
||||
Features.Set<IHttpContextFeature>(this);
|
||||
}
|
||||
|
||||
public DefaultConnectionContext(string id, IDuplexPipe transport, IDuplexPipe application)
|
||||
public HttpConnectionContext(string id, IDuplexPipe transport, IDuplexPipe application)
|
||||
: this(id)
|
||||
{
|
||||
Transport = transport;
|
||||
|
|
@ -79,7 +84,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
|
||||
public ClaimsPrincipal User { get; set; }
|
||||
|
||||
public override IDictionary<object, object> Items { get; set; } = new ConnectionMetadata();
|
||||
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems(new ConcurrentDictionary<object, object>());
|
||||
|
||||
public IDuplexPipe Application { get; set; }
|
||||
|
||||
|
|
@ -89,6 +94,8 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
|
||||
public TransferFormat ActiveFormat { get; set; }
|
||||
|
||||
public HttpContext HttpContext { get; set; }
|
||||
|
||||
public void OnHeartbeat(Action<object> action, object state)
|
||||
{
|
||||
lock (_heartbeatLock)
|
||||
|
|
@ -24,11 +24,11 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
{
|
||||
public partial class HttpConnectionDispatcher
|
||||
{
|
||||
private readonly ConnectionManager _manager;
|
||||
private readonly HttpConnectionManager _manager;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public HttpConnectionDispatcher(ConnectionManager manager, ILoggerFactory loggerFactory)
|
||||
public HttpConnectionDispatcher(HttpConnectionManager manager, ILoggerFactory loggerFactory)
|
||||
{
|
||||
_manager = manager;
|
||||
_loggerFactory = loggerFactory;
|
||||
|
|
@ -168,7 +168,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
{
|
||||
await connection.Lock.WaitAsync();
|
||||
|
||||
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Disposed)
|
||||
if (connection.Status == HttpConnectionContext.ConnectionStatus.Disposed)
|
||||
{
|
||||
Log.ConnectionDisposed(_logger, connection.ConnectionId);
|
||||
|
||||
|
|
@ -178,7 +178,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
return;
|
||||
}
|
||||
|
||||
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
|
||||
if (connection.Status == HttpConnectionContext.ConnectionStatus.Active)
|
||||
{
|
||||
var existing = connection.GetHttpContext();
|
||||
Log.ConnectionAlreadyActive(_logger, connection.ConnectionId, existing.TraceIdentifier);
|
||||
|
|
@ -196,7 +196,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
|
||||
// Mark the connection as active
|
||||
connection.Status = DefaultConnectionContext.ConnectionStatus.Active;
|
||||
connection.Status = HttpConnectionContext.ConnectionStatus.Active;
|
||||
|
||||
// Raise OnConnected for new connections only since polls happen all the time
|
||||
if (connection.ApplicationTask == null)
|
||||
|
|
@ -271,12 +271,12 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
{
|
||||
await connection.Lock.WaitAsync();
|
||||
|
||||
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
|
||||
if (connection.Status == HttpConnectionContext.ConnectionStatus.Active)
|
||||
{
|
||||
// Mark the connection as inactive
|
||||
connection.LastSeenUtc = DateTime.UtcNow;
|
||||
|
||||
connection.Status = DefaultConnectionContext.ConnectionStatus.Inactive;
|
||||
connection.Status = HttpConnectionContext.ConnectionStatus.Inactive;
|
||||
|
||||
// Dispose the cancellation token
|
||||
connection.Cancellation.Dispose();
|
||||
|
|
@ -295,13 +295,13 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
private async Task DoPersistentConnection(ConnectionDelegate connectionDelegate,
|
||||
IHttpTransport transport,
|
||||
HttpContext context,
|
||||
DefaultConnectionContext connection)
|
||||
HttpConnectionContext connection)
|
||||
{
|
||||
try
|
||||
{
|
||||
await connection.Lock.WaitAsync();
|
||||
|
||||
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Disposed)
|
||||
if (connection.Status == HttpConnectionContext.ConnectionStatus.Disposed)
|
||||
{
|
||||
Log.ConnectionDisposed(_logger, connection.ConnectionId);
|
||||
|
||||
|
|
@ -311,7 +311,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
|
||||
// There's already an active request
|
||||
if (connection.Status == DefaultConnectionContext.ConnectionStatus.Active)
|
||||
if (connection.Status == HttpConnectionContext.ConnectionStatus.Active)
|
||||
{
|
||||
Log.ConnectionAlreadyActive(_logger, connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
|
||||
|
||||
|
|
@ -321,7 +321,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
|
||||
// Mark the connection as active
|
||||
connection.Status = DefaultConnectionContext.ConnectionStatus.Active;
|
||||
connection.Status = HttpConnectionContext.ConnectionStatus.Active;
|
||||
|
||||
// Call into the end point passing the connection
|
||||
connection.ApplicationTask = ExecuteApplication(connectionDelegate, connection);
|
||||
|
|
@ -471,7 +471,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
await connection.Application.Output.FlushAsync();
|
||||
}
|
||||
|
||||
private async Task<bool> EnsureConnectionStateAsync(DefaultConnectionContext connection, HttpContext context, TransportType transportType, TransportType supportedTransports, ConnectionLogScope logScope, HttpConnectionOptions options)
|
||||
private async Task<bool> EnsureConnectionStateAsync(HttpConnectionContext connection, HttpContext context, TransportType transportType, TransportType supportedTransports, ConnectionLogScope logScope, HttpConnectionOptions options)
|
||||
{
|
||||
if ((supportedTransports & transportType) == 0)
|
||||
{
|
||||
|
|
@ -512,11 +512,11 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
// To make the IHttpContextFeature work well, we make a copy of the relevant properties
|
||||
// to a new HttpContext. This means that it's impossible to affect the context
|
||||
// with subsequent requests.
|
||||
var existing = connection.GetHttpContext();
|
||||
var existing = connection.HttpContext;
|
||||
if (existing == null)
|
||||
{
|
||||
var httpContext = CloneHttpContext(context);
|
||||
connection.SetHttpContext(httpContext);
|
||||
connection.HttpContext = httpContext;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -527,7 +527,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
else
|
||||
{
|
||||
connection.SetHttpContext(context);
|
||||
connection.HttpContext = context;
|
||||
}
|
||||
|
||||
// Set the Connection ID on the logging scope so that logs from now on will have the
|
||||
|
|
@ -596,7 +596,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
return newHttpContext;
|
||||
}
|
||||
|
||||
private async Task<DefaultConnectionContext> GetConnectionAsync(HttpContext context, HttpConnectionOptions options)
|
||||
private async Task<HttpConnectionContext> GetConnectionAsync(HttpContext context, HttpConnectionOptions options)
|
||||
{
|
||||
var connectionId = GetConnectionId(context);
|
||||
|
||||
|
|
@ -623,7 +623,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
return connection;
|
||||
}
|
||||
|
||||
private void EnsureConnectionStateInternal(DefaultConnectionContext connection, HttpConnectionOptions options)
|
||||
private void EnsureConnectionStateInternal(HttpConnectionContext connection, HttpConnectionOptions options)
|
||||
{
|
||||
// If the connection doesn't have a pipe yet then create one, we lazily create the pipe to save on allocations until the client actually connects
|
||||
if (connection.Transport == null)
|
||||
|
|
@ -637,10 +637,10 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
|
||||
// This is only used for WebSockets connections, which can connect directly without negotiating
|
||||
private async Task<DefaultConnectionContext> GetOrCreateConnectionAsync(HttpContext context, HttpConnectionOptions options)
|
||||
private async Task<HttpConnectionContext> GetOrCreateConnectionAsync(HttpContext context, HttpConnectionOptions options)
|
||||
{
|
||||
var connectionId = GetConnectionId(context);
|
||||
DefaultConnectionContext connection;
|
||||
HttpConnectionContext connection;
|
||||
|
||||
// There's no connection id so this is a brand new connection
|
||||
if (StringValues.IsNullOrEmpty(connectionId))
|
||||
|
|
|
|||
|
|
@ -19,20 +19,20 @@ using Microsoft.Extensions.Logging;
|
|||
|
||||
namespace Microsoft.AspNetCore.Sockets
|
||||
{
|
||||
public class ConnectionManager
|
||||
public class HttpConnectionManager
|
||||
{
|
||||
// TODO: Consider making this configurable? At least for testing?
|
||||
private static readonly TimeSpan _heartbeatTickRate = TimeSpan.FromSeconds(1);
|
||||
|
||||
private static readonly RNGCryptoServiceProvider _keyGenerator = new RNGCryptoServiceProvider();
|
||||
|
||||
private readonly ConcurrentDictionary<string, (DefaultConnectionContext Connection, ValueStopwatch Timer)> _connections = new ConcurrentDictionary<string, (DefaultConnectionContext Connection, ValueStopwatch Timer)>();
|
||||
private readonly ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)> _connections = new ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)>();
|
||||
private Timer _timer;
|
||||
private readonly ILogger<ConnectionManager> _logger;
|
||||
private readonly ILogger<HttpConnectionManager> _logger;
|
||||
private object _executionLock = new object();
|
||||
private bool _disposed;
|
||||
|
||||
public ConnectionManager(ILogger<ConnectionManager> logger, IApplicationLifetime appLifetime)
|
||||
public HttpConnectionManager(ILogger<HttpConnectionManager> logger, IApplicationLifetime appLifetime)
|
||||
{
|
||||
_logger = logger;
|
||||
appLifetime.ApplicationStarted.Register(() => Start());
|
||||
|
|
@ -55,7 +55,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
}
|
||||
|
||||
public bool TryGetConnection(string id, out DefaultConnectionContext connection)
|
||||
public bool TryGetConnection(string id, out HttpConnectionContext connection)
|
||||
{
|
||||
connection = null;
|
||||
|
||||
|
|
@ -71,20 +71,20 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
/// Creates a connection without Pipes setup to allow saving allocations until Pipes are needed.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public DefaultConnectionContext CreateConnection()
|
||||
public HttpConnectionContext CreateConnection()
|
||||
{
|
||||
var id = MakeNewConnectionId();
|
||||
|
||||
_logger.CreatedNewConnection(id);
|
||||
var connectionTimer = SocketEventSource.Log.ConnectionStart(id);
|
||||
|
||||
var connection = new DefaultConnectionContext(id);
|
||||
var connection = new HttpConnectionContext(id);
|
||||
|
||||
_connections.TryAdd(id, (connection, connectionTimer));
|
||||
return connection;
|
||||
}
|
||||
|
||||
public DefaultConnectionContext CreateConnection(PipeOptions transportPipeOptions, PipeOptions appPipeOptions)
|
||||
public HttpConnectionContext CreateConnection(PipeOptions transportPipeOptions, PipeOptions appPipeOptions)
|
||||
{
|
||||
var connection = CreateConnection();
|
||||
var pair = DuplexPipe.CreateConnectionPair(transportPipeOptions, appPipeOptions);
|
||||
|
|
@ -116,7 +116,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
|
||||
private static void Scan(object state)
|
||||
{
|
||||
((ConnectionManager)state).Scan();
|
||||
((HttpConnectionManager)state).Scan();
|
||||
}
|
||||
|
||||
public void Scan()
|
||||
|
|
@ -148,7 +148,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
// Scan the registered connections looking for ones that have timed out
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
var status = DefaultConnectionContext.ConnectionStatus.Inactive;
|
||||
var status = HttpConnectionContext.ConnectionStatus.Inactive;
|
||||
var lastSeenUtc = DateTimeOffset.UtcNow;
|
||||
var connection = c.Value.Connection;
|
||||
|
||||
|
|
@ -168,7 +168,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
|
||||
// Once the decision has been made to dispose we don't check the status again
|
||||
// But don't clean up connections while the debugger is attached.
|
||||
if (!Debugger.IsAttached && status == DefaultConnectionContext.ConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
|
||||
if (!Debugger.IsAttached && status == HttpConnectionContext.ConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
|
||||
{
|
||||
_logger.ConnectionTimedOut(connection.ConnectionId);
|
||||
SocketEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
|
||||
|
|
@ -221,7 +221,7 @@ namespace Microsoft.AspNetCore.Sockets
|
|||
}
|
||||
}
|
||||
|
||||
public async Task DisposeAndRemoveAsync(DefaultConnectionContext connection)
|
||||
public async Task DisposeAndRemoveAsync(HttpConnectionContext connection)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
|
@ -19,10 +19,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
private readonly WebSocketOptions _options;
|
||||
private readonly ILogger _logger;
|
||||
private readonly IDuplexPipe _application;
|
||||
private readonly DefaultConnectionContext _connection;
|
||||
private readonly HttpConnectionContext _connection;
|
||||
private volatile bool _aborted;
|
||||
|
||||
public WebSocketsTransport(WebSocketOptions options, IDuplexPipe application, DefaultConnectionContext connection, ILoggerFactory loggerFactory)
|
||||
public WebSocketsTransport(WebSocketOptions options, IDuplexPipe application, HttpConnectionContext connection, ILoggerFactory loggerFactory)
|
||||
{
|
||||
if (options == null)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ using System.IO.Pipelines;
|
|||
using System.Security.Claims;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Connections.Features;
|
||||
using Microsoft.AspNetCore.SignalR.Internal;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Formatters;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
|
||||
|
|
@ -16,8 +18,11 @@ using Microsoft.AspNetCore.Sockets;
|
|||
|
||||
namespace Microsoft.AspNetCore.SignalR.Tests
|
||||
{
|
||||
public class TestClient : IDisposable
|
||||
public class TestClient : ITransferFormatFeature, IConnectionHeartbeatFeature, IDisposable
|
||||
{
|
||||
private object _heartbeatLock = new object();
|
||||
private List<(Action<object> handler, object state)> _heartbeatHandlers;
|
||||
|
||||
private static int _id;
|
||||
private readonly IHubProtocol _protocol;
|
||||
private readonly IInvocationBinder _invocationBinder;
|
||||
|
|
@ -28,12 +33,20 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
public Task Connected => ((TaskCompletionSource<bool>)Connection.Items["ConnectedTask"]).Task;
|
||||
public HandshakeResponseMessage HandshakeResponseMessage { get; private set; }
|
||||
|
||||
public TransferFormat SupportedFormats { get; set; } = TransferFormat.Text | TransferFormat.Binary;
|
||||
|
||||
public TransferFormat ActiveFormat { get; set; }
|
||||
|
||||
public TestClient(bool synchronousCallbacks = false, IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, bool addClaimId = false)
|
||||
{
|
||||
var options = new PipeOptions(readerScheduler: synchronousCallbacks ? PipeScheduler.Inline : null);
|
||||
var pair = DuplexPipe.CreateConnectionPair(options, options);
|
||||
Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Transport, pair.Application);
|
||||
|
||||
// Add features SignalR needs for testing
|
||||
Connection.Features.Set<ITransferFormatFeature>(this);
|
||||
Connection.Features.Set<IConnectionHeartbeatFeature>(this);
|
||||
|
||||
var claimValue = Interlocked.Increment(ref _id).ToString();
|
||||
var claims = new List<Claim> { new Claim(ClaimTypes.Name, claimValue) };
|
||||
if (addClaimId)
|
||||
|
|
@ -269,6 +282,33 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
return Guid.NewGuid().ToString("N");
|
||||
}
|
||||
|
||||
public void OnHeartbeat(Action<object> action, object state)
|
||||
{
|
||||
lock (_heartbeatLock)
|
||||
{
|
||||
if (_heartbeatHandlers == null)
|
||||
{
|
||||
_heartbeatHandlers = new List<(Action<object> handler, object state)>();
|
||||
}
|
||||
_heartbeatHandlers.Add((action, state));
|
||||
}
|
||||
}
|
||||
|
||||
public void TickHeartbeat()
|
||||
{
|
||||
lock (_heartbeatLock)
|
||||
{
|
||||
if (_heartbeatHandlers == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var (handler, state) in _heartbeatHandlers)
|
||||
{
|
||||
handler(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
private class DefaultInvocationBinder : IInvocationBinder
|
||||
{
|
||||
public IReadOnlyList<Type> GetParameterTypes(string methodName)
|
||||
|
|
|
|||
|
|
@ -339,7 +339,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
using (var client = new TestClient(protocol: new MessagePackHubProtocol()))
|
||||
{
|
||||
client.Connection.SupportedFormats = TransferFormat.Text;
|
||||
client.SupportedFormats = TransferFormat.Text;
|
||||
|
||||
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
|
||||
|
||||
|
|
@ -1430,7 +1430,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
|
||||
using (var client = new TestClient(synchronousCallbacks: false, protocol: protocol, invocationBinder: invocationBinder.Object))
|
||||
{
|
||||
client.Connection.SupportedFormats = protocol.TransferFormat;
|
||||
client.SupportedFormats = protocol.TransferFormat;
|
||||
|
||||
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
|
||||
|
||||
|
|
@ -1711,7 +1711,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
var msgPackOptions = serviceProvider.GetRequiredService<IOptions<MessagePackHubProtocolOptions>>();
|
||||
using (var client = new TestClient(synchronousCallbacks: false, protocol: new MessagePackHubProtocol(msgPackOptions)))
|
||||
{
|
||||
client.Connection.SupportedFormats = TransferFormat.Binary;
|
||||
client.SupportedFormats = TransferFormat.Binary;
|
||||
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
|
||||
|
||||
await client.Connected.OrTimeout();
|
||||
|
|
@ -1860,7 +1860,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
// being available for the endpoint to run.
|
||||
for (var i = 0; i < 50; i += 1)
|
||||
{
|
||||
client.Connection.TickHeartbeat();
|
||||
client.TickHeartbeat();
|
||||
await Task.Yield();
|
||||
await Task.Delay(10);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -738,7 +738,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
await request1;
|
||||
|
||||
Assert.Equal(StatusCodes.Status204NoContent, context1.Response.StatusCode);
|
||||
Assert.Equal(DefaultConnectionContext.ConnectionStatus.Active, connection.Status);
|
||||
Assert.Equal(HttpConnectionContext.ConnectionStatus.Active, connection.Status);
|
||||
|
||||
Assert.False(request2.IsCompleted);
|
||||
|
||||
|
|
@ -757,7 +757,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
{
|
||||
var manager = CreateConnectionManager(loggerFactory);
|
||||
var connection = manager.CreateConnection();
|
||||
connection.Status = DefaultConnectionContext.ConnectionStatus.Disposed;
|
||||
connection.Status = HttpConnectionContext.ConnectionStatus.Disposed;
|
||||
|
||||
var dispatcher = new HttpConnectionDispatcher(manager, loggerFactory);
|
||||
|
||||
|
|
@ -804,7 +804,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
|
||||
await task;
|
||||
|
||||
Assert.Equal(DefaultConnectionContext.ConnectionStatus.Inactive, connection.Status);
|
||||
Assert.Equal(HttpConnectionContext.ConnectionStatus.Inactive, connection.Status);
|
||||
Assert.NotNull(connection.GetHttpContext());
|
||||
|
||||
Assert.Equal(StatusCodes.Status200OK, context.Response.StatusCode);
|
||||
|
|
@ -1459,9 +1459,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
}
|
||||
}
|
||||
|
||||
private static ConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory)
|
||||
private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory)
|
||||
{
|
||||
return new ConnectionManager(new Logger<ConnectionManager>(loggerFactory ?? new LoggerFactory()), new EmptyApplicationLifetime());
|
||||
return new HttpConnectionManager(new Logger<HttpConnectionManager>(loggerFactory ?? new LoggerFactory()), new EmptyApplicationLifetime());
|
||||
}
|
||||
|
||||
private string GetContentAsString(Stream body)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ using Xunit;
|
|||
|
||||
namespace Microsoft.AspNetCore.Sockets.Tests
|
||||
{
|
||||
public class ConnectionManagerTests
|
||||
public class HttpConnectionManagerTests
|
||||
{
|
||||
[Fact]
|
||||
public void NewConnectionsHaveConnectionId()
|
||||
|
|
@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
var connection = connectionManager.CreateConnection();
|
||||
|
||||
Assert.NotNull(connection.ConnectionId);
|
||||
Assert.Equal(DefaultConnectionContext.ConnectionStatus.Inactive, connection.Status);
|
||||
Assert.Equal(HttpConnectionContext.ConnectionStatus.Inactive, connection.Status);
|
||||
Assert.Null(connection.ApplicationTask);
|
||||
Assert.Null(connection.TransportTask);
|
||||
Assert.Null(connection.Cancellation);
|
||||
|
|
@ -187,7 +187,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
Assert.NotNull(connection.Transport);
|
||||
|
||||
await connection.DisposeAsync();
|
||||
Assert.Equal(DefaultConnectionContext.ConnectionStatus.Disposed, connection.Status);
|
||||
Assert.Equal(HttpConnectionContext.ConnectionStatus.Disposed, connection.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
@ -224,10 +224,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
await tcs.Task.OrTimeout();
|
||||
}
|
||||
|
||||
private static ConnectionManager CreateConnectionManager(IApplicationLifetime lifetime = null)
|
||||
private static HttpConnectionManager CreateConnectionManager(IApplicationLifetime lifetime = null)
|
||||
{
|
||||
lifetime = lifetime ?? new EmptyApplicationLifetime();
|
||||
return new ConnectionManager(new Logger<ConnectionManager>(new LoggerFactory()), lifetime);
|
||||
return new HttpConnectionManager(new Logger<HttpConnectionManager>(new LoggerFactory()), lifetime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ using System.IO.Pipelines;
|
|||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Sockets.Internal.Transports;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ using System.IO;
|
|||
using System.IO.Pipelines;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Sockets.Internal.Transports;
|
||||
|
|
|
|||
|
|
@ -32,11 +32,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(new WebSocketOptions(), connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
// Give the server socket to the transport and run it
|
||||
|
|
@ -79,11 +79,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
connectionContext.ActiveFormat = transferFormat;
|
||||
var ws = new WebSocketsTransport(new WebSocketOptions(), connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
|
|
@ -116,7 +116,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
|
|
@ -139,7 +139,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
}
|
||||
}
|
||||
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(new WebSocketOptions(), connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
// Give the server socket to the transport and run it
|
||||
|
|
@ -169,11 +169,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(new WebSocketOptions(), connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
// Give the server socket to the transport and run it
|
||||
|
|
@ -201,7 +201,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
|
|
@ -210,7 +210,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
CloseTimeout = TimeSpan.FromSeconds(1)
|
||||
};
|
||||
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(options, connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
var serverSocket = await feature.AcceptAsync();
|
||||
|
|
@ -236,7 +236,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
|
|
@ -245,7 +245,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
CloseTimeout = TimeSpan.FromSeconds(1)
|
||||
};
|
||||
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(options, connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
var serverSocket = await feature.AcceptAsync();
|
||||
|
|
@ -271,7 +271,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
|
|
@ -281,7 +281,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
CloseTimeout = TimeSpan.FromSeconds(20)
|
||||
};
|
||||
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(options, connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
var serverSocket = await feature.AcceptAsync();
|
||||
|
|
@ -311,7 +311,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
using (StartLog(out var loggerFactory, LogLevel.Debug))
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
|
||||
var connection = new DefaultConnectionContext("foo", pair.Transport, pair.Application);
|
||||
var connection = new HttpConnectionContext("foo", pair.Transport, pair.Application);
|
||||
|
||||
using (var feature = new TestWebSocketConnectionFeature())
|
||||
{
|
||||
|
|
@ -321,7 +321,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
CloseTimeout = TimeSpan.FromSeconds(20)
|
||||
};
|
||||
|
||||
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
|
||||
var connectionContext = new HttpConnectionContext(string.Empty, null, null);
|
||||
var ws = new WebSocketsTransport(options, connection.Application, connectionContext, loggerFactory);
|
||||
|
||||
var serverSocket = await feature.AcceptAsync();
|
||||
|
|
|
|||
Loading…
Reference in New Issue