Use Channel<byte[]> as the abstraction (#579)

* Use Channel<byte[]> as the abstraction
This commit is contained in:
David Fowler 2017-06-23 09:52:35 -07:00 committed by GitHub
parent 5234437af8
commit a84ba8820f
27 changed files with 160 additions and 153 deletions

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
{
public async override Task OnConnectedAsync(ConnectionContext connection)
{
await connection.Transport.Output.WriteAsync(await connection.Transport.Input.ReadAsync());
await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
}
}
}

View File

@ -5,7 +5,6 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Sockets;
namespace SocialWeather
@ -40,7 +39,7 @@ namespace SocialWeather
var ms = new MemoryStream();
await formatter.WriteAsync(data, ms);
connection.Transport.Output.TryWrite(ms.ToArray());
connection.Transport.Out.TryWrite(ms.ToArray());
}
}

View File

@ -34,9 +34,9 @@ namespace SocialWeather
var formatter = _formatterResolver.GetFormatter<WeatherReport>(
connection.Metadata.Get<string>("formatType"));
while (await connection.Transport.Input.WaitToReadAsync())
while (await connection.Transport.In.WaitToReadAsync())
{
if (connection.Transport.Input.TryRead(out var buffer))
if (connection.Transport.In.TryRead(out var buffer))
{
var stream = new MemoryStream();
await stream.WriteAsync(buffer, 0, buffer.Length);

View File

@ -20,9 +20,9 @@ namespace SocketsSample.EndPoints
try
{
while (await connection.Transport.Input.WaitToReadAsync())
while (await connection.Transport.In.WaitToReadAsync())
{
if (connection.Transport.Input.TryRead(out var buffer))
if (connection.Transport.In.TryRead(out var buffer))
{
// We can avoid the copy here but we'll deal with that later
var text = Encoding.UTF8.GetString(buffer);
@ -50,7 +50,7 @@ namespace SocketsSample.EndPoints
foreach (var c in Connections)
{
tasks.Add(c.Transport.Output.WriteAsync(payload));
tasks.Add(c.Transport.Out.WriteAsync(payload));
}
return Task.WhenAll(tasks);

View File

@ -312,9 +312,9 @@ namespace Microsoft.AspNetCore.SignalR.Redis
var protocol = connection.Metadata.Get<IHubProtocol>(HubConnectionMetadataNames.HubProtocol);
var data = protocol.WriteToArray(hubMessage);
while (await connection.Transport.Output.WaitToWriteAsync())
while (await connection.Transport.Out.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(data))
if (connection.Transport.Out.TryWrite(data))
{
break;
}

View File

@ -124,9 +124,9 @@ namespace Microsoft.AspNetCore.SignalR
var protocol = connection.Metadata.Get<IHubProtocol>(HubConnectionMetadataNames.HubProtocol);
var payload = protocol.WriteToArray(hubMessage);
while (await connection.Transport.Output.WaitToWriteAsync())
while (await connection.Transport.Out.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(payload))
if (connection.Transport.Out.TryWrite(payload))
{
break;
}

View File

@ -154,9 +154,9 @@ namespace Microsoft.AspNetCore.SignalR
try
{
while (await connection.Transport.Input.WaitToReadAsync(cts.Token))
while (await connection.Transport.In.WaitToReadAsync(cts.Token))
{
while (connection.Transport.Input.TryRead(out var buffer))
while (connection.Transport.In.TryRead(out var buffer))
{
if (protocol.TryParseMessages(buffer, this, out var hubMessages))
{
@ -232,9 +232,9 @@ namespace Microsoft.AspNetCore.SignalR
{
var payload = protocol.WriteToArray(hubMessage);
while (await connection.Transport.Output.WaitToWriteAsync())
while (await connection.Transport.Out.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(payload))
if (connection.Transport.Out.TryWrite(payload))
{
return;
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks.Channels;
namespace Microsoft.AspNetCore.Sockets.Internal
@ -18,20 +19,36 @@ namespace Microsoft.AspNetCore.Sockets.Internal
}
}
public class ChannelConnection<T> : ChannelConnection<T, T>, IChannelConnection<T>
public class ChannelConnection<T> : Channel<T>, IDisposable
{
public Channel<T> Input { get; }
public Channel<T> Output { get; }
public override ReadableChannel<T> In => Input;
public override WritableChannel<T> Out => Output;
public ChannelConnection(Channel<T> input, Channel<T> output)
: base(input, output)
{ }
{
Input = input;
Output = output;
}
public void Dispose()
{
Input.Out.TryComplete();
Output.Out.TryComplete();
}
}
public class ChannelConnection<TIn, TOut> : IChannelConnection<TIn, TOut>
public class ChannelConnection<TIn, TOut> : Channel<TOut, TIn>, IDisposable
{
public Channel<TIn> Input { get; }
public Channel<TOut> Output { get; }
ReadableChannel<TIn> IChannelConnection<TIn, TOut>.Input => Input;
WritableChannel<TOut> IChannelConnection<TIn, TOut>.Output => Output;
public override ReadableChannel<TIn> In => Input;
public override WritableChannel<TOut> Out => Output;
public ChannelConnection(Channel<TIn> input, Channel<TOut> output)
{
@ -41,8 +58,8 @@ namespace Microsoft.AspNetCore.Sockets.Internal
public void Dispose()
{
Output.Out.TryComplete();
Input.Out.TryComplete();
Output.Out.TryComplete();
}
}
}

View File

@ -2,10 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Security.Claims;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Sockets
@ -22,6 +20,6 @@ namespace Microsoft.AspNetCore.Sockets
public abstract ConnectionMetadata Metadata { get; }
// TEMPORARY
public abstract IChannelConnection<byte[]> Transport { get; set; }
public abstract Channel<byte[]> Transport { get; set; }
}
}

View File

@ -5,6 +5,7 @@ using System;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Sockets
@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets
// on the same task
private TaskCompletionSource<object> _disposeTcs = new TaskCompletionSource<object>();
public DefaultConnectionContext(string id, IChannelConnection<byte[]> transport, IChannelConnection<byte[]> application)
public DefaultConnectionContext(string id, Channel<byte[]> transport, Channel<byte[]> application)
{
Transport = transport;
Application = application;
@ -43,9 +44,9 @@ namespace Microsoft.AspNetCore.Sockets
public override ConnectionMetadata Metadata { get; } = new ConnectionMetadata();
public IChannelConnection<byte[]> Application { get; }
public Channel<byte[]> Application { get; }
public override IChannelConnection<byte[]> Transport { get; set; }
public override Channel<byte[]> Transport { get; set; }
public async Task DisposeAsync()
{
@ -66,17 +67,22 @@ namespace Microsoft.AspNetCore.Sockets
// If the application task is faulted, propagate the error to the transport
if (ApplicationTask?.IsFaulted == true)
{
Transport.Output.TryComplete(ApplicationTask.Exception.InnerException);
Transport.Out.TryComplete(ApplicationTask.Exception.InnerException);
}
else
{
Transport.Out.TryComplete();
}
// If the transport task is faulted, propagate the error to the application
if (TransportTask?.IsFaulted == true)
{
Application.Output.TryComplete(TransportTask.Exception.InnerException);
Application.Out.TryComplete(TransportTask.Exception.InnerException);
}
else
{
Application.Out.TryComplete();
}
Transport.Dispose();
Application.Dispose();
var applicationTask = ApplicationTask ?? Task.CompletedTask;
var transportTask = TransportTask ?? Task.CompletedTask;

View File

@ -1,20 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks.Channels;
namespace Microsoft.AspNetCore.Sockets
{
// REVIEW: These should probably move to Channels. Why not use IChannel? Because I think it's better to be clear that this is providing
// access to two separate channels, the read end for one and the write end for the other.
public interface IChannelConnection<T> : IChannelConnection<T, T>
{
}
public interface IChannelConnection<TIn, TOut> : IDisposable
{
ReadableChannel<TIn> Input { get; }
WritableChannel<TOut> Output { get; }
}
}

View File

@ -21,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private readonly ILogger _logger;
private volatile int _connectionState = ConnectionState.Initial;
private volatile IChannelConnection<byte[], SendMessage> _transportChannel;
private volatile ChannelConnection<byte[], SendMessage> _transportChannel;
private readonly HttpClient _httpClient;
private volatile ITransport _transport;
private volatile Task _receiveLoopTask;
@ -29,8 +29,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
private TaskQueue _eventQueue = new TaskQueue();
private readonly ITransportFactory _transportFactory;
private ReadableChannel<byte[]> Input => _transportChannel.Input;
private WritableChannel<SendMessage> Output => _transportChannel.Output;
private ReadableChannel<byte[]> Input => _transportChannel.In;
private WritableChannel<SendMessage> Output => _transportChannel.Out;
public Uri Url { get; }
@ -250,9 +250,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
var applicationToTransport = Channel.CreateUnbounded<SendMessage>();
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationSide = new ChannelConnection<SendMessage, byte[]>(applicationToTransport, transportToApplication);
_transportChannel = new ChannelConnection<byte[], SendMessage>(transportToApplication, applicationToTransport);
var applicationSide = ChannelConnection.Create(applicationToTransport, transportToApplication);
_transportChannel = ChannelConnection.Create(transportToApplication, applicationToTransport);
// Start the transport, giving it one end of the pipeline
try

View File

@ -3,12 +3,13 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
namespace Microsoft.AspNetCore.Sockets.Client
{
public interface ITransport
{
Task StartAsync(Uri url, IChannelConnection<SendMessage, byte[]> application);
Task StartAsync(Uri url, Channel<byte[], SendMessage> application);
Task StopAsync();
}
}

View File

@ -6,6 +6,7 @@ using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
private readonly HttpClient _httpClient;
private readonly ILogger _logger;
private IChannelConnection<SendMessage, byte[]> _application;
private Channel<byte[], SendMessage> _application;
private Task _sender;
private Task _poller;
@ -33,7 +34,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
}
public Task StartAsync(Uri url, IChannelConnection<SendMessage, byte[]> application)
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application)
{
_logger.LogInformation("Starting {0}", nameof(LongPollingTransport));
@ -47,7 +48,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException);
_application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
@ -100,9 +101,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
var payload = await response.Content.ReadAsByteArrayAsync();
if (payload.Length > 0)
{
while (!_application.Output.TryWrite(payload))
while (!_application.Out.TryWrite(payload))
{
if (cancellationToken.IsCancellationRequested || !await _application.Output.WaitToWriteAsync(cancellationToken))
if (cancellationToken.IsCancellationRequested || !await _application.Out.WaitToWriteAsync(cancellationToken))
{
return;
}

View File

@ -8,6 +8,7 @@ using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Client
@ -17,17 +18,17 @@ namespace Microsoft.AspNetCore.Sockets.Client
private static readonly string DefaultUserAgent = "Microsoft.AspNetCore.SignalR.Client/0.0.0";
public static readonly ProductInfoHeaderValue DefaultUserAgentHeader = ProductInfoHeaderValue.Parse(DefaultUserAgent);
public static async Task SendMessages(Uri sendUrl, IChannelConnection<SendMessage, byte[]> application, HttpClient httpClient, CancellationTokenSource transportCts, ILogger logger)
public static async Task SendMessages(Uri sendUrl, Channel<byte[], SendMessage> application, HttpClient httpClient, CancellationTokenSource transportCts, ILogger logger)
{
logger.LogInformation("Starting the send loop");
IList<SendMessage> messages = null;
try
{
while (await application.Input.WaitToReadAsync(transportCts.Token))
while (await application.In.WaitToReadAsync(transportCts.Token))
{
// Grab as many messages as we can from the channel
messages = new List<SendMessage>();
while (!transportCts.Token.IsCancellationRequested && application.Input.TryRead(out SendMessage message))
while (!transportCts.Token.IsCancellationRequested && application.In.TryRead(out SendMessage message))
{
messages.Add(message);
}

View File

@ -7,6 +7,7 @@ using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -20,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser();
private IChannelConnection<SendMessage, byte[]> _application;
private Channel<byte[], SendMessage> _application;
public Task Running { get; private set; } = Task.CompletedTask;
@ -39,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
}
public Task StartAsync(Uri url, IChannelConnection<SendMessage, byte[]> application)
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application)
{
_logger.LogInformation("Starting {transportName}", nameof(ServerSentEventsTransport));
@ -54,14 +55,14 @@ namespace Microsoft.AspNetCore.Sockets.Client
_logger.LogError(0, t.Exception.InnerException, "Transport stopped");
}
_application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
return Task.CompletedTask;
}
private async Task OpenConnection(IChannelConnection<SendMessage, byte[]> application, Uri url, CancellationToken cancellationToken)
private async Task OpenConnection(Channel<byte[], SendMessage> application, Uri url, CancellationToken cancellationToken)
{
_logger.LogInformation("Starting receive loop");
@ -94,7 +95,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
_application.Output.TryWrite(buffer);
_application.Out.TryWrite(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
@ -122,7 +123,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.LogInformation("Transport {transportName} is stopping", nameof(ServerSentEventsTransport));
_transportCts.Cancel();
_application.Output.TryComplete();
_application.Out.TryComplete();
await Running;
}
}

View File

@ -2,11 +2,12 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading.Tasks.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -15,7 +16,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
public class WebSocketsTransport : ITransport
{
private readonly ClientWebSocket _webSocket = new ClientWebSocket();
private IChannelConnection<SendMessage, byte[]> _application;
private Channel<byte[], SendMessage> _application;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
private readonly ILogger _logger;
@ -31,7 +32,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
public Task Running { get; private set; } = Task.CompletedTask;
public async Task StartAsync(Uri url, IChannelConnection<SendMessage, byte[]> application)
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application)
{
_logger.LogInformation("Starting {0}", nameof(WebSocketsTransport));
@ -57,7 +58,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.LogDebug("Transport stopped. Exception: '{0}'", t.Exception?.InnerException);
_application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
}
@ -84,7 +85,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.LogInformation("Websocket closed by the server. Close status {0}", receiveResult.CloseStatus);
_application.Output.Complete(
_application.Out.Complete(
receiveResult.CloseStatus == WebSocketCloseStatus.NormalClosure
? null
: new InvalidOperationException(
@ -119,9 +120,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
_logger.LogInformation("Passing message to application. Payload size: {0}", messageBuffer.Length);
while (await _application.Output.WaitToWriteAsync(_transportCts.Token))
while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
{
if (_application.Output.TryWrite(messageBuffer))
if (_application.Out.TryWrite(messageBuffer))
{
incomingMessage.Clear();
break;
@ -146,9 +147,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
try
{
while (await _application.Input.WaitToReadAsync(_transportCts.Token))
while (await _application.In.WaitToReadAsync(_transportCts.Token))
{
while (_application.Input.TryRead(out SendMessage message))
while (_application.In.TryRead(out SendMessage message))
{
try
{

View File

@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Sockets
}
// We only need to provide the Input channel since writing to the application is handled through /send.
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
var sse = new ServerSentEventsTransport(connection.Application.In, connection.ConnectionId, _loggerFactory);
await DoPersistentConnection(socketDelegate, sse, context, connection);
}
@ -184,7 +184,7 @@ namespace Microsoft.AspNetCore.Sockets
context.Response.RegisterForDispose(timeoutSource);
context.Response.RegisterForDispose(tokenSource);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, connection.ConnectionId, _loggerFactory);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.In, connection.ConnectionId, _loggerFactory);
// Start the transport
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
@ -206,7 +206,7 @@ namespace Microsoft.AspNetCore.Sockets
if (resultTask == connection.ApplicationTask)
{
// Complete the transport (notifying it of the application error if there is one)
connection.Transport.Output.TryComplete(connection.ApplicationTask.Exception);
connection.Transport.Out.TryComplete(connection.ApplicationTask.Exception);
// Wait for the transport to run
await connection.TransportTask;
@ -384,9 +384,9 @@ namespace Microsoft.AspNetCore.Sockets
}
_logger.ReceivedBytes(connection.ConnectionId, buffer.Length);
while (!connection.Application.Output.TryWrite(buffer))
while (!connection.Application.Out.TryWrite(buffer))
{
if (!await connection.Application.Output.WaitToWriteAsync())
if (!await connection.Application.Out.WaitToWriteAsync())
{
return;
}

View File

@ -7,6 +7,7 @@ using System.Diagnostics;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
@ -16,10 +17,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
private readonly WebSocketOptions _options;
private readonly ILogger _logger;
private readonly IChannelConnection<byte[]> _application;
private readonly Channel<byte[]> _application;
private readonly string _connectionId;
public WebSocketsTransport(WebSocketOptions options, IChannelConnection<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
public WebSocketsTransport(WebSocketOptions options, Channel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
if (options == null)
{
@ -80,7 +81,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
}
// We're done writing
_application.Output.TryComplete();
_application.Out.TryComplete();
await socket.CloseOutputAsync(failed ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
@ -153,9 +154,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
}
_logger.MessageToApplication(_connectionId, messageBuffer.Length);
while (await _application.Output.WaitToWriteAsync())
while (await _application.Out.WaitToWriteAsync())
{
if (_application.Output.TryWrite(messageBuffer))
if (_application.Out.TryWrite(messageBuffer))
{
incomingMessage.Clear();
break;
@ -166,10 +167,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
private async Task StartSending(WebSocket ws)
{
while (await _application.Input.WaitToReadAsync())
while (await _application.In.WaitToReadAsync())
{
// Get a frame from the application
while (_application.Input.TryRead(out var buffer))
while (_application.In.TryRead(out var buffer))
{
if (buffer.Length > 0)
{

View File

@ -54,8 +54,8 @@ namespace Microsoft.AspNetCore.Sockets
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication);
var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport);
var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication);
var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport);
var connection = new DefaultConnectionContext(id, applicationSide, transportSide);

View File

@ -7,6 +7,7 @@ using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.Extensions.Logging;
@ -151,7 +152,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
releaseDisposeTcs.SetResult(null);
await disposeTask.OrTimeout();
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, byte[]>>()), Times.Never);
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>()), Times.Never);
}
[Fact]
@ -234,7 +235,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
var mockTransport = new Mock<ITransport>();
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, byte[]>>()))
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>()))
.Returns(Task.FromException(new InvalidOperationException("Transport failed to start")));
@ -330,9 +331,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
var mockTransport = new Mock<ITransport>();
IChannelConnection<SendMessage, byte[]> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, byte[]>>()))
.Returns<Uri, IChannelConnection<SendMessage, byte[]>>((url, c) =>
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>()))
.Returns<Uri, Channel<byte[], SendMessage>>((url, c) =>
{
channel = c;
return Task.CompletedTask;
@ -342,8 +343,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
// The connection is now in the Disconnected state so the Received event for
// this message should not be raised
channel.Output.TryWrite(Array.Empty<byte>());
channel.Output.TryComplete();
channel.Out.TryWrite(Array.Empty<byte>());
channel.Out.TryComplete();
return Task.CompletedTask;
});
@ -372,9 +373,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
var mockTransport = new Mock<ITransport>();
IChannelConnection<SendMessage, byte[]> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, byte[]>>()))
.Returns<Uri, IChannelConnection<SendMessage, byte[]>>((url, c) =>
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>()))
.Returns<Uri, Channel<byte[], SendMessage>>((url, c) =>
{
channel = c;
return Task.CompletedTask;
@ -382,7 +383,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Output.TryComplete();
channel.Out.TryComplete();
return Task.CompletedTask;
});
@ -404,8 +405,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
connection.Closed += e => closedTcs.SetResult(null);
await connection.StartAsync();
channel.Output.TryWrite(Array.Empty<byte>());
channel.Output.TryWrite(Array.Empty<byte>());
channel.Out.TryWrite(Array.Empty<byte>());
channel.Out.TryWrite(Array.Empty<byte>());
await allowDisposeTcs.Task.OrTimeout();
await connection.DisposeAsync();
Assert.Equal(2, receivedInvocationCount);

View File

@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.Client.Tests
{
var connectionToTransport = Channel.CreateUnbounded<SendMessage>();
var transportToConnection = Channel.CreateUnbounded<byte[]>();
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
var channelConnection = ChannelConnection.Create(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
await longPollingTransport.Running.OrTimeout();

View File

@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
public async override Task OnConnectedAsync(ConnectionContext connection)
{
await connection.Transport.Output.WriteAsync(await connection.Transport.Input.ReadAsync());
await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
}
}
}

View File

@ -20,9 +20,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
private static int _id;
private IHubProtocol _protocol;
private CancellationTokenSource _cts;
private ChannelConnection<byte[]> _transport;
public DefaultConnectionContext Connection { get; }
public IChannelConnection<byte[]> Application { get; }
public Channel<byte[]> Application { get; }
public Task Connected => Connection.Metadata.Get<TaskCompletionSource<bool>>("ConnectedTask").Task;
public TestClient()
@ -31,9 +32,9 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
Application = ChannelConnection.Create<byte[]>(input: applicationToTransport, output: transportToApplication);
var transport = ChannelConnection.Create<byte[]>(input: transportToApplication, output: applicationToTransport);
_transport = ChannelConnection.Create<byte[]>(input: transportToApplication, output: applicationToTransport);
Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), transport, Application);
Connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), _transport, Application);
Connection.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.Name, Interlocked.Increment(ref _id).ToString()) }));
Connection.Metadata["ConnectedTask"] = new TaskCompletionSource<bool>();
@ -110,7 +111,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var invocationId = GetInvocationId();
var payload = _protocol.WriteToArray(new InvocationMessage(invocationId, nonBlocking: false, target: methodName, arguments: args));
await Application.Output.WriteAsync(payload);
await Application.Out.WriteAsync(payload);
return invocationId;
}
@ -123,7 +124,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
if (message == null)
{
if (!await Application.Input.WaitToReadAsync())
if (!await Application.In.WaitToReadAsync())
{
return null;
}
@ -137,7 +138,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public HubMessage TryRead()
{
if (Application.Input.TryRead(out var buffer) &&
if (Application.In.TryRead(out var buffer) &&
_protocol.TryParseMessages(buffer, this, out var messages))
{
return messages[0];
@ -148,7 +149,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public void Dispose()
{
_cts.Cancel();
Connection.Transport.Dispose();
_transport.Dispose();
}
private static string GetInvocationId()

View File

@ -81,12 +81,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests
connection.ApplicationTask = Task.Run(async () =>
{
Assert.False(await connection.Transport.Input.WaitToReadAsync());
Assert.False(await connection.Transport.In.WaitToReadAsync());
});
connection.TransportTask = Task.Run(async () =>
{
Assert.False(await connection.Application.Input.WaitToReadAsync());
Assert.False(await connection.Application.In.WaitToReadAsync());
});
connectionManager.CloseConnections();

View File

@ -478,7 +478,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the transport so the poll yields
await connection.Transport.Output.WriteAsync(buffer);
await connection.Transport.Out.WriteAsync(buffer);
await task;
@ -510,7 +510,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await connection.Application.Output.WriteAsync(buffer);
await connection.Application.Out.WriteAsync(buffer);
await task;
@ -540,7 +540,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await connection.Application.Output.WriteAsync(buffer);
await connection.Application.Out.WriteAsync(buffer);
await task;
@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await task1.OrTimeout();
// Send a message from the app to complete Task 2
await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
await task2.OrTimeout();
@ -703,7 +703,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -777,7 +777,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -827,7 +827,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -1036,7 +1036,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public override Task OnConnectedAsync(ConnectionContext connection)
{
connection.Transport.Input.WaitToReadAsync().Wait();
connection.Transport.In.WaitToReadAsync().Wait();
return Task.CompletedTask;
}
}
@ -1061,7 +1061,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
while (await connection.Transport.Input.WaitToReadAsync())
while (await connection.Transport.In.WaitToReadAsync())
{
}
}

View File

@ -25,8 +25,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
@ -45,10 +45,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
cancellationToken: CancellationToken.None);
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
var buffer = await applicationSide.Input.In.ReadAsync();
var buffer = await applicationSide.In.ReadAsync();
Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
Assert.True(applicationSide.Output.Out.TryComplete());
Assert.True(applicationSide.Out.TryComplete());
// The transport should finish now
await transport;
@ -68,8 +68,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions() { WebSocketMessageType = webSocketMessageType }, transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
@ -81,8 +81,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Write to the output channel, and then complete it
await applicationSide.Output.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
Assert.True(applicationSide.Output.Out.TryComplete());
await applicationSide.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
Assert.True(applicationSide.Out.TryComplete());
// The client should finish now, as should the server
var clientSummary = await client;
@ -102,8 +102,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var options = new WebSocketOptions()
@ -136,8 +136,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionId: string.Empty, loggerFactory: new LoggerFactory());
@ -149,7 +149,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Fail in the app
Assert.True(applicationSide.Output.Out.TryComplete(new InvalidOperationException("Catastrophic failure.")));
Assert.True(applicationSide.Out.TryComplete(new InvalidOperationException("Catastrophic failure.")));
var clientSummary = await client.OrTimeout();
Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
@ -167,8 +167,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var options = new WebSocketOptions()
@ -200,8 +200,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var options = new WebSocketOptions()
@ -233,8 +233,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var options = new WebSocketOptions()
@ -270,8 +270,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = new ChannelConnection<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = new ChannelConnection<byte[]>(transportToApplication, applicationToTransport))
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var options = new WebSocketOptions()