Updated to new System.IO.Pipelines package instead of Channels

This commit is contained in:
David Fowler 2016-11-15 21:46:18 -08:00
parent f3ebe03a0b
commit 4aa65cf0bf
41 changed files with 163 additions and 164 deletions

View File

@ -2,7 +2,6 @@
<configuration>
<packageSources>
<add key="NuGet" value="https://api.nuget.org/v3/index.json" />
<add key="Channels" value="https://www.myget.org/F/channels/api/v3/index.json" />
<add key="dotnet-corefxlab" value="https://dotnet.myget.org/F/dotnet-corefxlab/api/v3/index.json" />
<add key="AspNetCore" value="https://dotnet.myget.org/F/aspnetcore-ci-dev/api/v3/index.json" />
</packageSources>

View File

@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
namespace SocialWeather

View File

@ -1,5 +1,5 @@
using System.Threading.Tasks;
using Channels;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.Logging;

View File

@ -2,9 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample

View File

@ -5,11 +5,11 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

View File

@ -3,9 +3,9 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.Internal;

View File

@ -3,10 +3,10 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

View File

@ -1,8 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Security.Claims;
using Channels;
namespace Microsoft.AspNetCore.Sockets
{
@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Sockets
{
public string ConnectionId { get; set; }
public ClaimsPrincipal User { get; set; }
public IChannel Channel { get; set; }
public IPipelineConnection Channel { get; set; }
public ConnectionMetadata Metadata { get; } = new ConnectionMetadata();
}
}

View File

@ -3,8 +3,8 @@
using System;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;
using Channels;
namespace Microsoft.AspNetCore.Sockets
{
@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.Sockets
return state;
}
public ConnectionState AddNewConnection(IChannel channel)
public ConnectionState AddNewConnection(IPipelineConnection connection)
{
string id = MakeNewConnectionId();
@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Sockets
{
Connection = new Connection
{
Channel = channel,
Channel = connection,
ConnectionId = id
},
LastSeen = DateTimeOffset.UtcNow,

View File

@ -2,25 +2,25 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Channels;
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Sockets
{
public class HttpChannel : IChannel
public class HttpConnection : IPipelineConnection
{
public HttpChannel(ChannelFactory factory)
public HttpConnection(PipelineFactory factory)
{
Input = factory.CreateChannel();
Output = factory.CreateChannel();
Input = factory.Create();
Output = factory.Create();
}
IReadableChannel IChannel.Input => Input;
IPipelineReader IPipelineConnection.Input => Input;
IWritableChannel IChannel.Output => Output;
IPipelineWriter IPipelineConnection.Output => Output;
public Channel Input { get; }
public PipelineReaderWriter Input { get; }
public Channel Output { get; }
public PipelineReaderWriter Output { get; }
public void Dispose()
{

View File

@ -2,9 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@ -15,13 +15,13 @@ namespace Microsoft.AspNetCore.Sockets
public class HttpConnectionDispatcher
{
private readonly ConnectionManager _manager;
private readonly ChannelFactory _channelFactory;
private readonly PipelineFactory _pipelineFactory;
private readonly ILoggerFactory _loggerFactory;
public HttpConnectionDispatcher(ConnectionManager manager, ChannelFactory factory, ILoggerFactory loggerFactory)
public HttpConnectionDispatcher(ConnectionManager manager, PipelineFactory factory, ILoggerFactory loggerFactory)
{
_manager = manager;
_channelFactory = factory;
_pipelineFactory = factory;
_loggerFactory = loggerFactory;
}
@ -169,13 +169,13 @@ namespace Microsoft.AspNetCore.Sockets
private static void RegisterLongPollingDisconnect(HttpContext context, Connection connection)
{
// For long polling, we need to end the transport but not the overall connection so we write 0 bytes
context.RequestAborted.Register(state => ((HttpChannel)state).Output.WriteAsync(Span<byte>.Empty), connection.Channel);
context.RequestAborted.Register(state => ((HttpConnection)state).Output.WriteAsync(Span<byte>.Empty), connection.Channel);
}
private static void RegisterDisconnect(HttpContext context, Connection connection)
{
// We just kill the output writing as a signal to the transport that it is done
context.RequestAborted.Register(state => ((HttpChannel)state).Output.CompleteWriter(), connection.Channel);
context.RequestAborted.Register(state => ((HttpConnection)state).Output.CompleteWriter(), connection.Channel);
}
private Task ProcessGetId(HttpContext context)
@ -204,7 +204,7 @@ namespace Microsoft.AspNetCore.Sockets
{
// If we received an HTTP POST for the connection id and it's not an HttpChannel then fail.
// You can't write to a TCP channel directly from here.
var httpChannel = state.Connection.Channel as HttpChannel;
var httpChannel = state.Connection.Channel as HttpConnection;
if (httpChannel == null)
{
@ -233,7 +233,7 @@ namespace Microsoft.AspNetCore.Sockets
if (StringValues.IsNullOrEmpty(connectionId))
{
isNewConnection = true;
var channel = new HttpChannel(_channelFactory);
var channel = new HttpConnection(_pipelineFactory);
connectionState = _manager.AddNewConnection(channel);
}
else
@ -250,7 +250,7 @@ namespace Microsoft.AspNetCore.Sockets
if (connectionState.Connection.Channel == null)
{
isNewConnection = true;
connectionState.Connection.Channel = new HttpChannel(_channelFactory);
connectionState.Connection.Channel = new HttpConnection(_pipelineFactory);
connectionState.Active = true;
connectionState.LastSeen = DateTimeOffset.UtcNow;
}

View File

@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Channels;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Sockets;
@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.Builder
public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action<SocketRouteBuilder> callback)
{
var manager = new ConnectionManager();
var factory = new ChannelFactory();
var factory = new PipelineFactory();
var loggerFactory = app.ApplicationServices.GetService<ILoggerFactory>();
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory);

View File

@ -2,21 +2,21 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
namespace Microsoft.AspNetCore.Sockets
{
public class LongPolling : IHttpTransport
{
private readonly HttpChannel _channel;
private readonly HttpConnection _channel;
private readonly Connection _connection;
public LongPolling(Connection connection)
{
_connection = connection;
_channel = (HttpChannel)connection.Channel;
_channel = (HttpConnection)connection.Channel;
}
public async Task ProcessRequestAsync(HttpContext context)

View File

@ -2,21 +2,21 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
namespace Microsoft.AspNetCore.Sockets
{
public class ServerSentEvents : IHttpTransport
{
private readonly HttpChannel _channel;
private readonly HttpConnection _channel;
private readonly Connection _connection;
public ServerSentEvents(Connection connection)
{
_connection = connection;
_channel = (HttpChannel)connection.Channel;
_channel = (HttpConnection)connection.Channel;
}
public async Task ProcessRequestAsync(HttpContext context)

View File

@ -2,8 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.WebSockets.Internal;
using Microsoft.Extensions.Internal;
@ -18,13 +18,13 @@ namespace Microsoft.AspNetCore.Sockets
private static readonly TimeSpan _closeTimeout = TimeSpan.FromSeconds(5);
private static readonly WebSocketAcceptContext EmptyContext = new WebSocketAcceptContext();
private readonly HttpChannel _channel;
private readonly HttpConnection _channel;
private readonly WebSocketOpcode _opcode;
private readonly ILogger _logger;
public WebSockets(Connection connection, Format format, ILoggerFactory loggerFactory)
{
_channel = (HttpChannel)connection.Channel;
_channel = (HttpConnection)connection.Channel;
_opcode = format == Format.Binary ? WebSocketOpcode.Binary : WebSocketOpcode.Text;
_logger = (ILogger)loggerFactory?.CreateLogger<WebSockets>() ?? NullLogger.Instance;
@ -44,7 +44,7 @@ namespace Microsoft.AspNetCore.Sockets
_logger.LogInformation("Socket opened.");
// Begin sending and receiving. Receiving must be started first because ExecuteAsync enables SendAsync.
var receiving = ws.ExecuteAsync((frame, self) => ((WebSockets)self).HandleFrame(frame), this);
var receiving = ws.ExecuteAsync((frame, state) => ((WebSockets)state).HandleFrame(frame), this);
var sending = StartSending(ws);
// Wait for something to shut down.

View File

@ -22,7 +22,7 @@
},
"dependencies": {
"Channels": "0.2.0-beta-*",
"System.IO.Pipelines": "0.1.0-*",
"Microsoft.AspNetCore.Hosting.Abstractions": "1.2.0-*",
"Microsoft.AspNetCore.Routing": "1.2.0-*",
"Microsoft.AspNetCore.WebSockets.Internal": "0.1.0-*",

View File

@ -2,39 +2,39 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Channels;
using System.IO.Pipelines;
using Microsoft.AspNetCore.WebSockets.Internal;
namespace Microsoft.AspNetCore.Builder
{
public static class WebSocketAppBuilderExtensions
{
public static void UseWebSocketConnections(this IApplicationBuilder self)
public static void UseWebSocketConnections(this IApplicationBuilder app)
{
// Only the GC can clean up this channel factory :(
self.UseWebSocketConnections(new ChannelFactory(), new WebSocketConnectionOptions());
app.UseWebSocketConnections(new PipelineFactory(), new WebSocketConnectionOptions());
}
public static void UseWebSocketConnections(this IApplicationBuilder self, ChannelFactory channelFactory)
public static void UseWebSocketConnections(this IApplicationBuilder app, PipelineFactory factory)
{
if (channelFactory == null)
if (factory == null)
{
throw new ArgumentNullException(nameof(channelFactory));
throw new ArgumentNullException(nameof(factory));
}
self.UseWebSocketConnections(channelFactory, new WebSocketConnectionOptions());
app.UseWebSocketConnections(factory, new WebSocketConnectionOptions());
}
public static void UseWebSocketConnections(this IApplicationBuilder self, ChannelFactory channelFactory, WebSocketConnectionOptions options)
public static void UseWebSocketConnections(this IApplicationBuilder app, PipelineFactory factory, WebSocketConnectionOptions options)
{
if (channelFactory == null)
if (factory == null)
{
throw new ArgumentNullException(nameof(channelFactory));
throw new ArgumentNullException(nameof(factory));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
self.UseMiddleware<WebSocketConnectionMiddleware>(channelFactory, options);
app.UseMiddleware<WebSocketConnectionMiddleware>(factory, options);
}
}
}

View File

@ -2,8 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
@ -16,7 +16,7 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
private HttpContext _context;
private IHttpUpgradeFeature _upgradeFeature;
private ILogger _logger;
private readonly ChannelFactory _channelFactory;
private readonly PipelineFactory _factory;
public bool IsWebSocketRequest
{
@ -30,9 +30,9 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
}
}
public WebSocketConnectionFeature(HttpContext context, ChannelFactory channelFactory, IHttpUpgradeFeature upgradeFeature, ILoggerFactory loggerFactory)
public WebSocketConnectionFeature(HttpContext context, PipelineFactory factory, IHttpUpgradeFeature upgradeFeature, ILoggerFactory loggerFactory)
{
_channelFactory = channelFactory;
_factory = factory;
_context = context;
_upgradeFeature = upgradeFeature;
_logger = loggerFactory.CreateLogger<WebSocketConnectionFeature>();
@ -70,8 +70,8 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
_logger.LogDebug("Upgrading connection to WebSockets");
var opaqueTransport = await _upgradeFeature.UpgradeAsync();
var connection = new WebSocketConnection(
opaqueTransport.AsReadableChannel(),
_channelFactory.MakeWriteableChannel(opaqueTransport),
opaqueTransport.AsPipelineReader(),
_factory.CreateWriter(opaqueTransport),
subProtocol: subProtocol);
return connection;
}

View File

@ -2,8 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
@ -12,20 +12,20 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
{
public class WebSocketConnectionMiddleware
{
private readonly ChannelFactory _channelFactory;
private readonly PipelineFactory _factory;
private readonly ILoggerFactory _loggerFactory;
private readonly RequestDelegate _next;
private readonly WebSocketConnectionOptions _options;
public WebSocketConnectionMiddleware(RequestDelegate next, ChannelFactory channelFactory, WebSocketConnectionOptions options, ILoggerFactory loggerFactory)
public WebSocketConnectionMiddleware(RequestDelegate next, PipelineFactory factory, WebSocketConnectionOptions options, ILoggerFactory loggerFactory)
{
if (next == null)
{
throw new ArgumentNullException(nameof(next));
}
if (channelFactory == null)
if (factory == null)
{
throw new ArgumentNullException(nameof(channelFactory));
throw new ArgumentNullException(nameof(factory));
}
if (options == null)
{
@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
_next = next;
_loggerFactory = loggerFactory;
_channelFactory = channelFactory;
_factory = factory;
_options = options;
}
@ -49,7 +49,7 @@ namespace Microsoft.AspNetCore.WebSockets.Internal
{
if (_options.ReplaceFeature || context.Features.Get<IHttpWebSocketConnectionFeature>() == null)
{
context.Features.Set<IHttpWebSocketConnectionFeature>(new WebSocketConnectionFeature(context, _channelFactory, upgradeFeature, _loggerFactory));
context.Features.Set<IHttpWebSocketConnectionFeature>(new WebSocketConnectionFeature(context, _factory, upgradeFeature, _loggerFactory));
}
}

View File

@ -3,7 +3,7 @@
using System;
using System.Binary;
using Channels;
using System.IO.Pipelines;
namespace Microsoft.Extensions.WebSockets.Internal
{

View File

@ -3,17 +3,17 @@
using System.Threading;
using System.Threading.Tasks;
using Channels;
using System.IO.Pipelines;
namespace Microsoft.Extensions.WebSockets.Internal
{
public static class ChannelExtensions
public static class PipelineReaderExtensions
{
public static ValueTask<ChannelReadResult> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes) => ReadAtLeastAsync(input, minimumRequiredBytes, CancellationToken.None);
public static ValueTask<ReadResult> ReadAtLeastAsync(this IPipelineReader input, int minimumRequiredBytes) => ReadAtLeastAsync(input, minimumRequiredBytes, CancellationToken.None);
// TODO: Pull this up to Channels. We should be able to do it there without allocating a Task<T> in any case (rather than here where we can avoid allocation
// only if the buffer is already ready and has enough data)
public static ValueTask<ChannelReadResult> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
public static ValueTask<ReadResult> ReadAtLeastAsync(this IPipelineReader input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var awaiter = input.ReadAsync(/* cancellationToken */);
@ -25,7 +25,7 @@ namespace Microsoft.Extensions.WebSockets.Internal
if (result.IsCompleted || result.Buffer.Length >= minimumRequiredBytes)
{
return new ValueTask<ChannelReadResult>(result);
return new ValueTask<ReadResult>(result);
}
// Buffer wasn't big enough, mark it as examined and continue to the "slow" path below
@ -33,10 +33,10 @@ namespace Microsoft.Extensions.WebSockets.Internal
consumed: result.Buffer.Start,
examined: result.Buffer.End);
}
return new ValueTask<ChannelReadResult>(ReadAtLeastSlowAsync(awaiter, input, minimumRequiredBytes, cancellationToken));
return new ValueTask<ReadResult>(ReadAtLeastSlowAsync(awaiter, input, minimumRequiredBytes, cancellationToken));
}
private static async Task<ChannelReadResult> ReadAtLeastSlowAsync(ReadableChannelAwaitable awaitable, IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
private static async Task<ReadResult> ReadAtLeastSlowAsync(ReadableBufferAwaitable awaitable, IPipelineReader input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var result = await awaitable;
while (!result.IsCompleted && result.Buffer.Length < minimumRequiredBytes)

View File

@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Channels;
using System.IO.Pipelines;
namespace Microsoft.Extensions.WebSockets.Internal
{

View File

@ -2,9 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Binary;
using System.IO.Pipelines;
using System.IO.Pipelines.Text.Primitives;
using System.Text;
using Channels;
using Channels.Text.Primitives;
using System.Text.Formatting;
namespace Microsoft.Extensions.WebSockets.Internal
{
@ -70,7 +71,7 @@ namespace Microsoft.Extensions.WebSockets.Internal
buffer.WriteBigEndian((ushort)Status);
if (!string.IsNullOrEmpty(Description))
{
buffer.WriteUtf8String(Description);
buffer.Append(Description, EncodingData.TextEncoding.Utf8);
}
}
}

View File

@ -5,10 +5,10 @@ using System;
using System.Binary;
using System.Diagnostics;
using System.Globalization;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
@ -29,8 +29,8 @@ namespace Microsoft.Extensions.WebSockets.Internal
{
private WebSocketOptions _options;
private readonly byte[] _maskingKeyBuffer;
private readonly IReadableChannel _inbound;
private readonly IWritableChannel _outbound;
private readonly IPipelineReader _inbound;
private readonly IPipelineWriter _outbound;
private readonly CancellationTokenSource _terminateReceiveCts = new CancellationTokenSource();
private readonly Timer _pinger;
private readonly CancellationTokenSource _timerCts = new CancellationTokenSource();
@ -45,36 +45,36 @@ namespace Microsoft.Extensions.WebSockets.Internal
public WebSocketConnectionState State { get; private set; } = WebSocketConnectionState.Created;
/// <summary>
/// Constructs a new, unmasked, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// Constructs a new, unmasked, <see cref="WebSocketConnection"/> from an <see cref="IPipelineReader"/> and an <see cref="IPipelineWriter"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound) : this(inbound, outbound, options: WebSocketOptions.DefaultUnmasked) { }
/// <param name="inbound">A <see cref="IPipelineReader"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IPipelineWriter"/> to which frame will be written when sending.</param>
public WebSocketConnection(IPipelineReader inbound, IPipelineWriter outbound) : this(inbound, outbound, options: WebSocketOptions.DefaultUnmasked) { }
/// <summary>
/// Constructs a new, unmasked, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// Constructs a new, unmasked, <see cref="WebSocketConnection"/> from an <see cref="IPipelineReader"/> and an <see cref="IPipelineWriter"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
/// <param name="inbound">A <see cref="IPipelineReader"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IPipelineWriter"/> to which frame will be written when sending.</param>
/// <param name="subProtocol">The sub-protocol provided during handshaking</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound, string subProtocol) : this(inbound, outbound, subProtocol, options: WebSocketOptions.DefaultUnmasked) { }
public WebSocketConnection(IPipelineReader inbound, IPipelineWriter outbound, string subProtocol) : this(inbound, outbound, subProtocol, options: WebSocketOptions.DefaultUnmasked) { }
/// <summary>
/// Constructs a new, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// Constructs a new, <see cref="WebSocketConnection"/> from an <see cref="IPipelineReader"/> and an <see cref="IPipelineWriter"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
/// <param name="inbound">A <see cref="IPipelineReader"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IPipelineWriter"/> to which frame will be written when sending.</param>
/// <param name="options">A <see cref="WebSocketOptions"/> which provides the configuration options for the socket.</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound, WebSocketOptions options) : this(inbound, outbound, subProtocol: string.Empty, options: options) { }
public WebSocketConnection(IPipelineReader inbound, IPipelineWriter outbound, WebSocketOptions options) : this(inbound, outbound, subProtocol: string.Empty, options: options) { }
/// <summary>
/// Constructs a new <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// Constructs a new <see cref="WebSocketConnection"/> from an <see cref="IPipelineReader"/> and an <see cref="IPipelineWriter"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
/// <param name="inbound">A <see cref="IPipelineReader"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IPipelineWriter"/> to which frame will be written when sending.</param>
/// <param name="subProtocol">The sub-protocol provided during handshaking</param>
/// <param name="options">A <see cref="WebSocketOptions"/> which provides the configuration options for the socket.</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound, string subProtocol, WebSocketOptions options)
public WebSocketConnection(IPipelineReader inbound, IPipelineWriter outbound, string subProtocol, WebSocketOptions options)
{
_inbound = inbound;
_outbound = outbound;

View File

@ -1,7 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Channels;
using System.IO.Pipelines;
namespace Microsoft.Extensions.WebSockets.Internal
{

View File

@ -48,15 +48,15 @@ namespace Microsoft.Extensions.WebSockets.Internal
public static class WebSocketOpcodeExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool IsControl(this WebSocketOpcode self)
public static bool IsControl(this WebSocketOpcode opcode)
{
return self >= WebSocketOpcode.Close;
return opcode >= WebSocketOpcode.Close;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool IsMessage(this WebSocketOpcode self)
public static bool IsMessage(this WebSocketOpcode opcode)
{
return self < WebSocketOpcode.Close;
return opcode < WebSocketOpcode.Close;
}
}
}

View File

@ -23,8 +23,8 @@
},
"dependencies": {
"Channels": "0.2.0-beta-*",
"Channels.Text.Primitives": "0.2.0-beta-*",
"System.IO.Pipelines": "0.1.0-*",
"System.IO.Pipelines.Text.Primitives": "0.1.0-*",
"Microsoft.Extensions.TaskCache.Sources": {
"version": "1.2.0-*",
"type": "build"

View File

@ -3,9 +3,9 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.AspNetCore.Sockets.Tests
@ -42,8 +42,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public void AddNewConnection()
{
using (var factory = new ChannelFactory())
using (var channel = new HttpChannel(factory))
using (var factory = new PipelineFactory())
using (var channel = new HttpConnection(factory))
{
var connectionManager = new ConnectionManager();
var state = connectionManager.AddNewConnection(channel);
@ -62,8 +62,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public void RemoveConnection()
{
using (var factory = new ChannelFactory())
using (var channel = new HttpChannel(factory))
using (var factory = new PipelineFactory())
using (var channel = new HttpConnection(factory))
{
var connectionManager = new ConnectionManager();
var state = connectionManager.AddNewConnection(channel);

View File

@ -4,10 +4,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Internal;
using Microsoft.Extensions.Primitives;
@ -21,7 +21,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public async Task GetIdReservesConnectionIdAndReturnsIt()
{
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
@ -44,7 +44,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var manager = new ConnectionManager();
var state = manager.ReserveConnection();
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public async Task SendingToUnknownConnectionIdThrows()
{
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();
@ -84,7 +84,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public async Task SendingWithoutConnectionIdThrows()
{
var manager = new ConnectionManager();
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var dispatcher = new HttpConnectionDispatcher(manager, factory, loggerFactory: null);
var context = new DefaultHttpContext();

View File

@ -4,10 +4,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Xunit;
@ -18,11 +18,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public async Task Set204StatusCodeWhenChannelComplete()
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var connection = new Connection();
connection.ConnectionId = Guid.NewGuid().ToString();
var channel = new HttpChannel(factory);
var channel = new HttpConnection(factory);
connection.Channel = channel;
var context = new DefaultHttpContext();
var poll = new LongPolling(connection);
@ -38,11 +38,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public async Task NoFramingAddedWhenDataSent()
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var connection = new Connection();
connection.ConnectionId = Guid.NewGuid().ToString();
var channel = new HttpChannel(factory);
var channel = new HttpConnection(factory);
connection.Channel = channel;
var context = new DefaultHttpContext();
var ms = new MemoryStream();

View File

@ -4,10 +4,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Http;
using Xunit;
@ -18,16 +18,16 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public async Task SSESetsContentType()
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var connection = new Connection();
connection.ConnectionId = Guid.NewGuid().ToString();
var channel = new HttpChannel(factory);
connection.Channel = channel;
var httpConnection = new HttpConnection(factory);
connection.Channel = httpConnection;
var sse = new ServerSentEvents(connection);
var context = new DefaultHttpContext();
channel.Output.CompleteWriter();
httpConnection.Output.CompleteWriter();
await sse.ProcessRequestAsync(context);
@ -39,20 +39,20 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public async Task SSEAddsAppropriateFraming()
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var connection = new Connection();
connection.ConnectionId = Guid.NewGuid().ToString();
var channel = new HttpChannel(factory);
connection.Channel = channel;
var httpConnection = new HttpConnection(factory);
connection.Channel = httpConnection;
var sse = new ServerSentEvents(connection);
var context = new DefaultHttpContext();
var ms = new MemoryStream();
context.Response.Body = ms;
await channel.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
await httpConnection.Output.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
channel.Output.CompleteWriter();
httpConnection.Output.CompleteWriter();
await sse.ProcessRequestAsync(context);

View File

@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests

View File

@ -3,16 +3,15 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public static class WebSocketConnectionExtensions
{
public static async Task<WebSocketConnectionSummary> ExecuteAndCaptureFramesAsync(this IWebSocketConnection self)
public static async Task<WebSocketConnectionSummary> ExecuteAndCaptureFramesAsync(this IWebSocketConnection connection)
{
var frames = new List<WebSocketFrame>();
var closeResult = await self.ExecuteAsync(frame => frames.Add(frame.Copy()));
var closeResult = await connection.ExecuteAsync(frame => frames.Add(frame.Copy()));
return new WebSocketConnectionSummary(frames, closeResult);
}
}

View File

@ -2,10 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.Extensions.Internal;
using Xunit;

View File

@ -3,10 +3,10 @@
using System;
using System.Globalization;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests

View File

@ -2,9 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests

View File

@ -2,10 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
@ -192,12 +192,12 @@ namespace Microsoft.Extensions.WebSockets.Internal.Tests
}
}
private static async Task<WebSocketConnectionSummary> RunReceiveTest(Func<IWritableChannel, CancellationToken, Task> producer)
private static async Task<WebSocketConnectionSummary> RunReceiveTest(Func<IPipelineWriter, CancellationToken, Task> producer)
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var outbound = factory.CreateChannel();
var inbound = factory.CreateChannel();
var outbound = factory.Create();
var inbound = factory.Create();
var timeoutToken = TestUtil.CreateTimeoutToken();

View File

@ -2,9 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.Extensions.Internal;
using Xunit;
@ -172,10 +172,10 @@ namespace Microsoft.Extensions.WebSockets.Internal.Tests
private static async Task<byte[]> RunSendTest(Func<WebSocketConnection, Task> producer, WebSocketOptions options)
{
using (var factory = new ChannelFactory())
using (var factory = new PipelineFactory())
{
var outbound = factory.CreateChannel();
var inbound = factory.CreateChannel();
var outbound = factory.Create();
var inbound = factory.Create();
Task executeTask;
using (var connection = new WebSocketConnection(inbound, outbound, options))
@ -197,12 +197,12 @@ namespace Microsoft.Extensions.WebSockets.Internal.Tests
}
}
private static void CompleteChannels(params Channel[] channels)
private static void CompleteChannels(params PipelineReaderWriter[] readerWriters)
{
foreach (var channel in channels)
foreach (var readerWriter in readerWriters)
{
channel.CompleteReader();
channel.CompleteWriter();
readerWriter.CompleteReader();
readerWriter.CompleteWriter();
}
}
}

View File

@ -1,10 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests

View File

@ -2,21 +2,21 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Channels;
using System.IO.Pipelines;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
internal class WebSocketPair : IDisposable
{
private ChannelFactory _factory;
private PipelineFactory _factory;
public Channel ServerToClient { get; }
public Channel ClientToServer { get; }
public PipelineReaderWriter ServerToClient { get; }
public PipelineReaderWriter ClientToServer { get; }
public IWebSocketConnection ClientSocket { get; }
public IWebSocketConnection ServerSocket { get; }
public WebSocketPair(ChannelFactory factory, Channel serverToClient, Channel clientToServer, IWebSocketConnection clientSocket, IWebSocketConnection serverSocket)
public WebSocketPair(PipelineFactory factory, PipelineReaderWriter serverToClient, PipelineReaderWriter clientToServer, IWebSocketConnection clientSocket, IWebSocketConnection serverSocket)
{
_factory = factory;
ServerToClient = serverToClient;
@ -30,9 +30,9 @@ namespace Microsoft.Extensions.WebSockets.Internal.Tests
public static WebSocketPair Create(WebSocketOptions serverOptions, WebSocketOptions clientOptions)
{
// Create channels
var factory = new ChannelFactory();
var serverToClient = factory.CreateChannel();
var clientToServer = factory.CreateChannel();
var factory = new PipelineFactory();
var serverToClient = factory.Create();
var clientToServer = factory.Create();
var serverSocket = new WebSocketConnection(clientToServer, serverToClient, options: serverOptions);
var clientSocket = new WebSocketConnection(serverToClient, clientToServer, options: clientOptions);

View File

@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
@ -20,11 +20,11 @@ namespace WebSocketsTestApp
// For more information on how to configure your application, visit http://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ChannelFactory>();
services.AddSingleton<PipelineFactory>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, ChannelFactory channelFactory)
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, PipelineFactory pipelineFactory)
{
loggerFactory.AddConsole(LogLevel.Debug);
@ -33,7 +33,7 @@ namespace WebSocketsTestApp
app.UseDeveloperExceptionPage();
}
app.UseWebSocketConnections(new ChannelFactory());
app.UseWebSocketConnections(pipelineFactory);
app.Use(async (context, next) =>
{