From 03352354dc495e3714dbf1e6e738acf807cf4abc Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 30 Sep 2016 01:44:56 -0700 Subject: [PATCH 01/99] Initial commit --- .gitattributes | 52 +++++++ .gitignore | 33 +++++ LICENSE.md | 13 ++ NuGet.config | 7 + WebApplication95.sln | 32 +++++ global.json | 6 + src/WebApplication95/Bus.cs | 58 ++++++++ src/WebApplication95/Connection.cs | 52 +++++++ src/WebApplication95/ConnectionManager.cs | 102 ++++++++++++++ src/WebApplication95/ConnectionState.cs | 11 ++ src/WebApplication95/Dispatcher.cs | 143 ++++++++++++++++++++ src/WebApplication95/IDispatcher.cs | 9 ++ src/WebApplication95/LongPolling.cs | 138 +++++++++++++++++++ src/WebApplication95/Program.cs | 27 ++++ src/WebApplication95/ServerSentEvents.cs | 126 +++++++++++++++++ src/WebApplication95/Startup.cs | 36 +++++ src/WebApplication95/WebApplication95.xproj | 25 ++++ src/WebApplication95/WebSockets.cs | 86 ++++++++++++ src/WebApplication95/project.json | 50 +++++++ src/WebApplication95/web.config | 14 ++ src/WebApplication95/wwwroot/index.html | 55 ++++++++ src/WebApplication95/wwwroot/polling.html | 61 +++++++++ 22 files changed, 1136 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 LICENSE.md create mode 100644 NuGet.config create mode 100644 WebApplication95.sln create mode 100644 global.json create mode 100644 src/WebApplication95/Bus.cs create mode 100644 src/WebApplication95/Connection.cs create mode 100644 src/WebApplication95/ConnectionManager.cs create mode 100644 src/WebApplication95/ConnectionState.cs create mode 100644 src/WebApplication95/Dispatcher.cs create mode 100644 src/WebApplication95/IDispatcher.cs create mode 100644 src/WebApplication95/LongPolling.cs create mode 100644 src/WebApplication95/Program.cs create mode 100644 src/WebApplication95/ServerSentEvents.cs create mode 100644 src/WebApplication95/Startup.cs create mode 100644 src/WebApplication95/WebApplication95.xproj create mode 100644 src/WebApplication95/WebSockets.cs create mode 100644 src/WebApplication95/project.json create mode 100644 src/WebApplication95/web.config create mode 100644 src/WebApplication95/wwwroot/index.html create mode 100644 src/WebApplication95/wwwroot/polling.html diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..c2f0f84273 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,52 @@ +*.doc diff=astextplain +*.DOC diff=astextplain +*.docx diff=astextplain +*.DOCX diff=astextplain +*.dot diff=astextplain +*.DOT diff=astextplain +*.pdf diff=astextplain +*.PDF diff=astextplain +*.rtf diff=astextplain +*.RTF diff=astextplain + +*.jpg binary +*.png binary +*.gif binary + +*.cs text=auto diff=csharp +*.vb text=auto +*.resx text=auto +*.c text=auto +*.cpp text=auto +*.cxx text=auto +*.h text=auto +*.hxx text=auto +*.py text=auto +*.rb text=auto +*.java text=auto +*.html text=auto +*.htm text=auto +*.css text=auto +*.scss text=auto +*.sass text=auto +*.less text=auto +*.js text=auto +*.lisp text=auto +*.clj text=auto +*.sql text=auto +*.php text=auto +*.lua text=auto +*.m text=auto +*.asm text=auto +*.erl text=auto +*.fs text=auto +*.fsx text=auto +*.hs text=auto + +*.csproj text=auto +*.vbproj text=auto +*.fsproj text=auto +*.dbproj text=auto +*.sln text=auto eol=crlf + +*.sh eol=lf \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..9c43d9d9a3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +[Oo]bj/ +[Bb]in/ +TestResults/ +.nuget/ +*.sln.ide/ +_ReSharper.*/ +packages/ +artifacts/ +PublishProfiles/ +.vs/ +*.user +*.suo +*.cache +*.docstates +_ReSharper.* +nuget.exe +*net45.csproj +*net451.csproj +*k10.csproj +*.psess +*.vsp +*.pidb +*.userprefs +*DS_Store +*.ncrunchsolution +*.*sdf +*.ipch +project.lock.json +runtimes/ +.build/ +.testPublish/ +launchSettings.json +*.tmp diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000000..be67c6cd06 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,13 @@ +Copyright (c) David Fowler All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You may +obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing permissions +and limitations under the License. diff --git a/NuGet.config b/NuGet.config new file mode 100644 index 0000000000..3ce9e7f223 --- /dev/null +++ b/NuGet.config @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/WebApplication95.sln b/WebApplication95.sln new file mode 100644 index 0000000000..607b118e87 --- /dev/null +++ b/WebApplication95.sln @@ -0,0 +1,32 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 14 +VisualStudioVersion = 14.0.25420.1 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-5398-4884-87E4-B816698CDE65}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{83B2C3EB-A3D8-4E6F-9A3C-A380B005EF31}" + ProjectSection(SolutionItems) = preProject + global.json = global.json + EndProjectSection +EndProject +Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "WebApplication95", "src\WebApplication95\WebApplication95.xproj", "{52ED8B3A-2DBB-448A-A708-FAA0783B7917}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {52ED8B3A-2DBB-448A-A708-FAA0783B7917}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {52ED8B3A-2DBB-448A-A708-FAA0783B7917}.Debug|Any CPU.Build.0 = Debug|Any CPU + {52ED8B3A-2DBB-448A-A708-FAA0783B7917}.Release|Any CPU.ActiveCfg = Release|Any CPU + {52ED8B3A-2DBB-448A-A708-FAA0783B7917}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {52ED8B3A-2DBB-448A-A708-FAA0783B7917} = {DA69F624-5398-4884-87E4-B816698CDE65} + EndGlobalSection +EndGlobal diff --git a/global.json b/global.json new file mode 100644 index 0000000000..e793049cd5 --- /dev/null +++ b/global.json @@ -0,0 +1,6 @@ +{ + "projects": [ "src", "test" ], + "sdk": { + "version": "1.0.0-preview2-003121" + } +} diff --git a/src/WebApplication95/Bus.cs b/src/WebApplication95/Bus.cs new file mode 100644 index 0000000000..8a03b6f5f2 --- /dev/null +++ b/src/WebApplication95/Bus.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace WebApplication95 +{ + public class Message + { + public string ContentType { get; set; } + public ArraySegment Payload { get; set; } + } + + public class Bus + { + private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); + + public IDisposable Subscribe(string key, IObserver observer) + { + var connections = _subscriptions.GetOrAdd(key, _ => new List>()); + connections.Add(observer); + + return new DisposableAction(() => + { + connections.Remove(observer); + }); + } + + public void Publish(string key, Message message) + { + List> connections; + if (_subscriptions.TryGetValue(key, out connections)) + { + foreach (var c in connections) + { + c.OnNext(message); + } + } + } + + private class DisposableAction : IDisposable + { + private Action _action; + + public DisposableAction(Action action) + { + _action = action; + } + + public void Dispose() + { + Interlocked.Exchange(ref _action, () => { }).Invoke(); + } + } + } +} diff --git a/src/WebApplication95/Connection.cs b/src/WebApplication95/Connection.cs new file mode 100644 index 0000000000..0ab18ecd97 --- /dev/null +++ b/src/WebApplication95/Connection.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Channels; + +namespace WebApplication95 +{ + public enum TransportType + { + LongPolling, + WebSockets, + ServerSentEvents + } + + public class Connection : IChannel + { + public TransportType TransportType { get; set; } + + public string ConnectionId { get; set; } + + IReadableChannel IChannel.Input => Input; + + IWritableChannel IChannel.Output => Output; + + internal Channel Input { get; set; } + + internal Channel Output { get; set; } + + public Connection() + { + Stream = new ChannelStream(this); + } + + public Stream Stream { get; } + + public void Complete() + { + Input.CompleteReader(); + Input.CompleteWriter(); + + Output.CompleteReader(); + Output.CompleteWriter(); + } + + public void Dispose() + { + + } + } +} diff --git a/src/WebApplication95/ConnectionManager.cs b/src/WebApplication95/ConnectionManager.cs new file mode 100644 index 0000000000..7acf181a93 --- /dev/null +++ b/src/WebApplication95/ConnectionManager.cs @@ -0,0 +1,102 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Channels; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Primitives; + +namespace WebApplication95 +{ + public class ConnectionManager + { + private ConcurrentDictionary _connections = new ConcurrentDictionary(); + private readonly ChannelFactory _channelFactory = new ChannelFactory(); + private Timer _timer; + + public ConnectionManager() + { + _timer = new Timer(Scan, this, 0, 1000); + } + + private static void Scan(object state) + { + ((ConnectionManager)state).Scan(); + } + + private void Scan() + { + foreach (var c in _connections) + { + if (!c.Value.Alive && (DateTimeOffset.UtcNow - c.Value.LastSeen).TotalSeconds > 30) + { + ConnectionState s; + if (_connections.TryRemove(c.Key, out s)) + { + s.Connection.Complete(); + } + else + { + + } + } + } + } + + public string GetConnectionId(HttpContext context) + { + // REVIEW: Only check the query string for longpolling + var id = context.Request.Query["id"]; + + if (!StringValues.IsNullOrEmpty(id)) + { + return id.ToString(); + } + + return Guid.NewGuid().ToString(); + } + + public bool TryGetConnection(string id, out ConnectionState state) + { + return _connections.TryGetValue(id, out state); + } + + public bool AddConnection(string id, out ConnectionState state) + { + state = _connections.GetOrAdd(id, connectionId => new ConnectionState()); + var isNew = state.Connection == null; + if (isNew) + { + state.Connection = new Connection + { + ConnectionId = id, + Input = _channelFactory.CreateChannel(), + Output = _channelFactory.CreateChannel() + }; + } + state.LastSeen = DateTimeOffset.UtcNow; + state.Alive = true; + return isNew; + } + + public void MarkConnectionDead(string id) + { + ConnectionState state; + if (_connections.TryGetValue(id, out state)) + { + state.Alive = false; + } + } + + public void RemoveConnection(string id) + { + ConnectionState state; + if (_connections.TryRemove(id, out state)) + { + + } + } + } +} diff --git a/src/WebApplication95/ConnectionState.cs b/src/WebApplication95/ConnectionState.cs new file mode 100644 index 0000000000..b615c798ff --- /dev/null +++ b/src/WebApplication95/ConnectionState.cs @@ -0,0 +1,11 @@ +using System; + +namespace WebApplication95 +{ + public class ConnectionState + { + public DateTimeOffset LastSeen { get; set; } + public bool Alive { get; set; } = true; + public Connection Connection { get; set; } + } +} diff --git a/src/WebApplication95/Dispatcher.cs b/src/WebApplication95/Dispatcher.cs new file mode 100644 index 0000000000..19dd8e4d9a --- /dev/null +++ b/src/WebApplication95/Dispatcher.cs @@ -0,0 +1,143 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Channels; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Primitives; + +namespace WebApplication95 +{ + public class Dispatcher + { + private readonly ConnectionManager _manager = new ConnectionManager(); + private readonly EndPoint _endpoint = new EndPoint(); + + public async Task Execute(HttpContext context) + { + if (context.Request.Path.StartsWithSegments("/send")) + { + var connectionId = context.Request.Query["id"]; + + if (StringValues.IsNullOrEmpty(connectionId)) + { + throw new InvalidOperationException("Missing connection id"); + } + + ConnectionState state; + if (_manager.TryGetConnection(connectionId, out state)) + { + // Write the message length + await context.Request.Body.CopyToAsync(state.Connection.Input); + } + } + else + { + var connectionId = _manager.GetConnectionId(context); + + // Outgoing channels + if (context.Request.Path.StartsWithSegments("/sse")) + { + ConnectionState state; + _manager.AddConnection(connectionId, out state); + + var sse = new ServerSentEvents(state); + + var ignore = _endpoint.OnConnected(state.Connection); + + state.Connection.TransportType = TransportType.ServerSentEvents; + + await sse.ProcessRequest(context); + + state.Connection.Complete(); + + _manager.RemoveConnection(connectionId); + } + else if (context.Request.Path.StartsWithSegments("/ws")) + { + ConnectionState state; + _manager.AddConnection(connectionId, out state); + + var ws = new WebSockets(state); + + var ignore = _endpoint.OnConnected(state.Connection); + + state.Connection.TransportType = TransportType.WebSockets; + + await ws.ProcessRequest(context); + + state.Connection.Complete(); + + _manager.RemoveConnection(connectionId); + } + else if (context.Request.Path.StartsWithSegments("/poll")) + { + ConnectionState state; + bool newConnection = false; + if (_manager.AddConnection(connectionId, out state)) + { + newConnection = true; + var ignore = _endpoint.OnConnected(state.Connection); + state.Connection.TransportType = TransportType.LongPolling; + } + + var longPolling = new LongPolling(state); + + await longPolling.ProcessRequest(newConnection, context); + + _manager.MarkConnectionDead(connectionId); + } + + } + } + } + + public class EndPoint + { + private List _connections = new List(); + + public virtual async Task OnConnected(Connection connection) + { + lock (_connections) + { + _connections.Add(connection); + } + + // Echo server + while (true) + { + var input = await connection.Input.ReadAsync(); + try + { + if (input.IsEmpty && connection.Input.Reading.IsCompleted) + { + break; + } + + List connections = null; + lock (_connections) + { + connections = _connections; + } + + foreach (var c in connections) + { + var output = c.Output.Alloc(); + output.Append(ref input); + await output.FlushAsync(); + } + } + finally + { + connection.Input.Advance(input.End); + } + } + + lock (_connections) + { + _connections.Remove(connection); + } + } + } +} diff --git a/src/WebApplication95/IDispatcher.cs b/src/WebApplication95/IDispatcher.cs new file mode 100644 index 0000000000..2beada37bb --- /dev/null +++ b/src/WebApplication95/IDispatcher.cs @@ -0,0 +1,9 @@ +using System; + +namespace WebApplication95 +{ + public interface IDispatcher + { + void OnIncoming(ArraySegment data); + } +} \ No newline at end of file diff --git a/src/WebApplication95/LongPolling.cs b/src/WebApplication95/LongPolling.cs new file mode 100644 index 0000000000..7a6da27114 --- /dev/null +++ b/src/WebApplication95/LongPolling.cs @@ -0,0 +1,138 @@ +using System; +using System.Text; +using System.Threading.Tasks; +using Channels; +using Microsoft.AspNetCore.Http; + +namespace WebApplication95 +{ + public class LongPolling + { + private Task _lastTask; + private object _lockObj = new object(); + private bool _completed; + private TaskCompletionSource _initTcs = new TaskCompletionSource(); + private TaskCompletionSource _lifetime = new TaskCompletionSource(); + private HttpContext _context; + private readonly ConnectionState _state; + + public LongPolling(ConnectionState state) + { + _lastTask = _initTcs.Task; + _state = state; + } + + private Task Post(Func work, object state) + { + if (_completed) + { + return _lastTask; + } + + lock (_lockObj) + { + _lastTask = _lastTask.ContinueWith((t, s1) => work(s1), state).Unwrap(); + } + + return _lastTask; + } + + public async Task ProcessRequest(bool newConnection, HttpContext context) + { + context.Response.ContentType = "application/json"; + + // End the connection if the client goes away + context.RequestAborted.Register(state => OnConnectionAborted(state), this); + + _context = context; + + _initTcs.TrySetResult(null); + + if (newConnection) + { + // Flush the connection id to the connection + var ignore = Send(default(ArraySegment)); + } + else + { + // Send queue messages to the connection + var ignore = ProcessMessages(context); + } + + + await _lifetime.Task; + + _completed = true; + } + + private async Task ProcessMessages(HttpContext context) + { + var buffer = await _state.Connection.Output.ReadAsync(); + + foreach (var memory in buffer) + { + ArraySegment data; + if (memory.TryGetArray(out data)) + { + await Send(data); + // Advance the buffer one block of memory + _state.Connection.Output.Advance(buffer.Slice(memory.Length).Start); + break; + } + } + } + + private static void OnConnectionAborted(object state) + { + ((LongPolling)state).CompleteRequest(); + } + + private void CompleteRequest() + { + Post(state => + { + ((TaskCompletionSource)state).TrySetResult(null); + return Task.CompletedTask; + }, + _lifetime); + } + + public async Task Send(ArraySegment value) + { + await Post(async state => + { + var data = ((ArraySegment)state); + // + 100 = laziness + var buffer = new byte[data.Count + _state.Connection.ConnectionId.Length + 100]; + var at = 0; + buffer[at++] = (byte)'{'; + buffer[at++] = (byte)'"'; + buffer[at++] = (byte)'c'; + buffer[at++] = (byte)'"'; + buffer[at++] = (byte)':'; + buffer[at++] = (byte)'"'; + int count = Encoding.UTF8.GetBytes(_state.Connection.ConnectionId, 0, _state.Connection.ConnectionId.Length, buffer, at); + at += count; + buffer[at++] = (byte)'"'; + if (data.Array != null) + { + buffer[at++] = (byte)','; + buffer[at++] = (byte)'"'; + buffer[at++] = (byte)'d'; + buffer[at++] = (byte)'"'; + buffer[at++] = (byte)':'; + //buffer[at++] = (byte)'"'; + Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count); + } + at += data.Count; + //buffer[at++] = (byte)'"'; + buffer[at++] = (byte)'}'; + _context.Response.ContentLength = at; + await _context.Response.Body.WriteAsync(buffer, 0, at); + }, + value); + + CompleteRequest(); + } + } +} diff --git a/src/WebApplication95/Program.cs b/src/WebApplication95/Program.cs new file mode 100644 index 0000000000..be7ba38402 --- /dev/null +++ b/src/WebApplication95/Program.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Hosting; + +namespace WebApplication95 +{ + public class Program + { + public static void Main(string[] args) + { + var host = new WebHostBuilder() + .UseKestrel(options => + { + options.UseConnectionLogging(); + }) + .UseContentRoot(Directory.GetCurrentDirectory()) + .UseIISIntegration() + .UseStartup() + .Build(); + + host.Run(); + } + } +} diff --git a/src/WebApplication95/ServerSentEvents.cs b/src/WebApplication95/ServerSentEvents.cs new file mode 100644 index 0000000000..0aaf8287ee --- /dev/null +++ b/src/WebApplication95/ServerSentEvents.cs @@ -0,0 +1,126 @@ +using System; +using System.Threading.Tasks; +using Channels; +using Microsoft.AspNetCore.Http; + +namespace WebApplication95 +{ + public class ServerSentEvents + { + private Task _lastTask; + private object _lockObj = new object(); + private bool _completed; + private TaskCompletionSource _initTcs = new TaskCompletionSource(); + private TaskCompletionSource _lifetime = new TaskCompletionSource(); + private HttpContext _context; + private readonly ConnectionState _state; + + public ServerSentEvents(ConnectionState state) + { + _state = state; + _lastTask = _initTcs.Task; + var ignore = StartSending(); + } + + private Task Post(Func work, object state) + { + if (_completed) + { + return _lastTask; + } + + lock (_lockObj) + { + _lastTask = _lastTask.ContinueWith((t, s1) => work(s1), state).Unwrap(); + } + + return _lastTask; + } + + public async Task ProcessRequest(HttpContext context) + { + context.Response.ContentType = "text/event-stream"; + + // End the connection if the client goes away + context.RequestAborted.Register(state => OnConnectionAborted(state), this); + + _context = context; + + await _context.Response.WriteAsync($"data: {_state.Connection.ConnectionId}\n\n"); + + // Set the initial TCS when everything is setup + _initTcs.TrySetResult(null); + + await _lifetime.Task; + + _completed = true; + } + + private static void OnConnectionAborted(object state) + { + ((ServerSentEvents)state).OnConnectedAborted(); + } + + private void OnConnectedAborted() + { + Post(state => + { + ((TaskCompletionSource)state).TrySetResult(null); + return Task.CompletedTask; + }, + _lifetime); + } + + private async Task StartSending() + { + await _initTcs.Task; + + while (true) + { + var buffer = await _state.Connection.Output.ReadAsync(); + + if (buffer.IsEmpty && _state.Connection.Output.Reading.IsCompleted) + { + break; + } + + foreach (var memory in buffer) + { + ArraySegment data; + if (memory.TryGetArray(out data)) + { + await Send(data); + } + } + + _state.Connection.Output.Advance(buffer.End); + } + + _state.Connection.Output.CompleteReader(); + } + + private Task Send(ArraySegment value) + { + return Post(async state => + { + var data = ((ArraySegment)state); + // TODO: Pooled buffers + // 8 = 6(data: ) + 2 (\n\n) + var buffer = new byte[8 + data.Count]; + var at = 0; + buffer[at++] = (byte)'d'; + buffer[at++] = (byte)'a'; + buffer[at++] = (byte)'t'; + buffer[at++] = (byte)'a'; + buffer[at++] = (byte)':'; + buffer[at++] = (byte)' '; + Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count); + at += data.Count; + buffer[at++] = (byte)'\n'; + buffer[at++] = (byte)'\n'; + await _context.Response.Body.WriteAsync(buffer, 0, at); + }, + value); + } + } +} diff --git a/src/WebApplication95/Startup.cs b/src/WebApplication95/Startup.cs new file mode 100644 index 0000000000..8ddf30b1fa --- /dev/null +++ b/src/WebApplication95/Startup.cs @@ -0,0 +1,36 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace WebApplication95 +{ + public class Startup + { + // This method gets called by the runtime. Use this method to add services to the container. + // For more information on how to configure your application, visit http://go.microsoft.com/fwlink/?LinkID=398940 + public void ConfigureServices(IServiceCollection services) + { + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + { + loggerFactory.AddConsole(LogLevel.Debug); + + app.UseFileServer(); + + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + var dispatcher = new Dispatcher(); + + app.Run(async (context) => + { + await dispatcher.Execute(context); + }); + } + } +} diff --git a/src/WebApplication95/WebApplication95.xproj b/src/WebApplication95/WebApplication95.xproj new file mode 100644 index 0000000000..85dbf4bbcb --- /dev/null +++ b/src/WebApplication95/WebApplication95.xproj @@ -0,0 +1,25 @@ + + + + 14.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + + 52ed8b3a-2dbb-448a-a708-faa0783b7917 + WebApplication95 + .\obj + .\bin\ + v4.5.2 + + + + 2.0 + + + + + + + diff --git a/src/WebApplication95/WebSockets.cs b/src/WebApplication95/WebSockets.cs new file mode 100644 index 0000000000..a9ab616d96 --- /dev/null +++ b/src/WebApplication95/WebSockets.cs @@ -0,0 +1,86 @@ +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using Channels; +using Microsoft.AspNetCore.Http; + +namespace WebApplication95 +{ + public class WebSockets + { + private WebSocket _ws; + private ConnectionState _state; + private TaskCompletionSource _tcs = new TaskCompletionSource(); + + public WebSockets(ConnectionState state) + { + _state = state; + var ignore = StartSending(); + } + + private async Task StartSending() + { + await _tcs.Task; + + while (true) + { + var buffer = await _state.Connection.Output.ReadAsync(); + + if (buffer.IsEmpty && _state.Connection.Output.Reading.IsCompleted) + { + break; + } + + foreach (var memory in buffer) + { + ArraySegment data; + if (memory.TryGetArray(out data)) + { + await _ws.SendAsync(data, WebSocketMessageType.Text, endOfMessage: true, cancellationToken: CancellationToken.None); + } + } + + _state.Connection.Output.Advance(buffer.End); + } + + _state.Connection.Output.CompleteReader(); + } + + public async Task ProcessRequest(HttpContext context) + { + if (!context.WebSockets.IsWebSocketRequest) + { + await Task.CompletedTask; + return; + } + + var ws = await context.WebSockets.AcceptWebSocketAsync(); + + _ws = ws; + + _tcs.TrySetResult(null); + + var buffer = new byte[2048]; + while (true) + { + var result = await ws.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + + // TODO: Fragments + if (result.MessageType == WebSocketMessageType.Text) + { + await _state.Connection.Input.WriteAsync(new Span(buffer, 0, result.Count)); + } + else if (result.MessageType == WebSocketMessageType.Binary) + { + await _state.Connection.Input.WriteAsync(new Span(buffer, 0, result.Count)); + } + else if (result.MessageType == WebSocketMessageType.Close) + { + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); + break; + } + } + } + } +} diff --git a/src/WebApplication95/project.json b/src/WebApplication95/project.json new file mode 100644 index 0000000000..10ee59e745 --- /dev/null +++ b/src/WebApplication95/project.json @@ -0,0 +1,50 @@ +{ + "dependencies": { + "Channels": "0.2.0-beta-*", + "Microsoft.NETCore.App": { + "version": "1.0.0", + "type": "platform" + }, + "Microsoft.AspNetCore.Diagnostics": "1.0.0", + "Microsoft.AspNetCore.StaticFiles": "1.0.0", + + "Microsoft.AspNetCore.Server.IISIntegration": "1.0.0", + "Microsoft.AspNetCore.Server.Kestrel": "1.0.0", + "Microsoft.Extensions.Logging.Console": "1.0.0" + }, + + "tools": { + "Microsoft.AspNetCore.Server.IISIntegration.Tools": "1.0.0-preview2-final" + }, + + "frameworks": { + "netcoreapp1.0": { + "imports": [ + "dotnet5.6", + "portable-net45+win8" + ] + } + }, + + "buildOptions": { + "emitEntryPoint": true, + "preserveCompilationContext": true + }, + + "runtimeOptions": { + "configProperties": { + "System.GC.Server": true + } + }, + + "publishOptions": { + "include": [ + "wwwroot", + "web.config" + ] + }, + + "scripts": { + "postpublish": [ "dotnet publish-iis --publish-folder %publish:OutputPath% --framework %publish:FullTargetFramework%" ] + } +} diff --git a/src/WebApplication95/web.config b/src/WebApplication95/web.config new file mode 100644 index 0000000000..dc0514fca5 --- /dev/null +++ b/src/WebApplication95/web.config @@ -0,0 +1,14 @@ + + + + + + + + + + + + diff --git a/src/WebApplication95/wwwroot/index.html b/src/WebApplication95/wwwroot/index.html new file mode 100644 index 0000000000..a956cdfb66 --- /dev/null +++ b/src/WebApplication95/wwwroot/index.html @@ -0,0 +1,55 @@ + + + + + + + + +

Server Sent Events

+ + + +
    + +
+ + \ No newline at end of file diff --git a/src/WebApplication95/wwwroot/polling.html b/src/WebApplication95/wwwroot/polling.html new file mode 100644 index 0000000000..fc75370f12 --- /dev/null +++ b/src/WebApplication95/wwwroot/polling.html @@ -0,0 +1,61 @@ + + + + + + + + +

Long Polling

+ + + +
    + + \ No newline at end of file From 27ddb7de900f4aa258cd3a6d834d0436661ecf05 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 30 Sep 2016 02:41:07 -0700 Subject: [PATCH 02/99] More cleanup --- src/WebApplication95/Bus.cs | 16 +-- src/WebApplication95/Dispatcher.cs | 98 ++++++++----------- src/WebApplication95/EndPoint.cs | 21 ++++ .../EndPoints/ChatEndPoint.cs | 51 ++++++++++ src/WebApplication95/IDispatcher.cs | 9 -- src/WebApplication95/LongPolling.cs | 4 +- src/WebApplication95/Routing/PrefixRoute.cs | 61 ++++++++++++ .../Routing/RouteBuilderExtensions.cs | 21 ++++ src/WebApplication95/Startup.cs | 12 ++- src/WebApplication95/project.json | 1 + src/WebApplication95/wwwroot/index.html | 4 +- src/WebApplication95/wwwroot/polling.html | 4 +- 12 files changed, 217 insertions(+), 85 deletions(-) create mode 100644 src/WebApplication95/EndPoint.cs create mode 100644 src/WebApplication95/EndPoints/ChatEndPoint.cs delete mode 100644 src/WebApplication95/IDispatcher.cs create mode 100644 src/WebApplication95/Routing/PrefixRoute.cs create mode 100644 src/WebApplication95/Routing/RouteBuilderExtensions.cs diff --git a/src/WebApplication95/Bus.cs b/src/WebApplication95/Bus.cs index 8a03b6f5f2..b7be6e8818 100644 --- a/src/WebApplication95/Bus.cs +++ b/src/WebApplication95/Bus.cs @@ -4,22 +4,22 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Channels; namespace WebApplication95 { public class Message { - public string ContentType { get; set; } - public ArraySegment Payload { get; set; } + public ReadableBuffer Payload { get; set; } } public class Bus { - private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); + private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); - public IDisposable Subscribe(string key, IObserver observer) + public IDisposable Subscribe(string key, Func observer) { - var connections = _subscriptions.GetOrAdd(key, _ => new List>()); + var connections = _subscriptions.GetOrAdd(key, _ => new List>()); connections.Add(observer); return new DisposableAction(() => @@ -28,14 +28,14 @@ namespace WebApplication95 }); } - public void Publish(string key, Message message) + public async Task Publish(string key, Message message) { - List> connections; + List> connections; if (_subscriptions.TryGetValue(key, out connections)) { foreach (var c in connections) { - c.OnNext(message); + await c(message); } } } diff --git a/src/WebApplication95/Dispatcher.cs b/src/WebApplication95/Dispatcher.cs index 19dd8e4d9a..be14340db2 100644 --- a/src/WebApplication95/Dispatcher.cs +++ b/src/WebApplication95/Dispatcher.cs @@ -1,22 +1,47 @@ using System; using System.Collections.Generic; -using System.IO; -using System.Linq; using System.Threading.Tasks; using Channels; +using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Primitives; +using WebApplication95.Routing; namespace WebApplication95 { + public static class DispatcherExtensions + { + public static IApplicationBuilder UseRealTimeConnections(this IApplicationBuilder app, Action callback) + { + var dispatcher = new Dispatcher(app); + callback(dispatcher); + app.UseRouter(dispatcher.GetRouter()); + return app; + } + } + public class Dispatcher { private readonly ConnectionManager _manager = new ConnectionManager(); - private readonly EndPoint _endpoint = new EndPoint(); + private readonly RouteBuilder _routes; - public async Task Execute(HttpContext context) + public Dispatcher(IApplicationBuilder app) { - if (context.Request.Path.StartsWithSegments("/send")) + _routes = new RouteBuilder(app); + } + + public void MapEndPoint(string path) where TEndPoint : EndPoint + { + _routes.AddPrefixRoute(path, new RouteHandler(c => Execute(path, c))); + } + + public IRouter GetRouter() => _routes.Build(); + + public async Task Execute(string path, HttpContext context) where TEndPoint : EndPoint + { + if (context.Request.Path.StartsWithSegments(path + "/send")) { var connectionId = context.Request.Query["id"]; @@ -34,17 +59,19 @@ namespace WebApplication95 } else { + var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); + var connectionId = _manager.GetConnectionId(context); // Outgoing channels - if (context.Request.Path.StartsWithSegments("/sse")) + if (context.Request.Path.StartsWithSegments(path + "/sse")) { ConnectionState state; _manager.AddConnection(connectionId, out state); var sse = new ServerSentEvents(state); - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); state.Connection.TransportType = TransportType.ServerSentEvents; @@ -54,14 +81,14 @@ namespace WebApplication95 _manager.RemoveConnection(connectionId); } - else if (context.Request.Path.StartsWithSegments("/ws")) + else if (context.Request.Path.StartsWithSegments(path + "/ws")) { ConnectionState state; _manager.AddConnection(connectionId, out state); var ws = new WebSockets(state); - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); state.Connection.TransportType = TransportType.WebSockets; @@ -71,14 +98,15 @@ namespace WebApplication95 _manager.RemoveConnection(connectionId); } - else if (context.Request.Path.StartsWithSegments("/poll")) + else if (context.Request.Path.StartsWithSegments(path + "/poll")) { ConnectionState state; bool newConnection = false; if (_manager.AddConnection(connectionId, out state)) { newConnection = true; - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); + state.Connection.TransportType = TransportType.LongPolling; } @@ -92,52 +120,4 @@ namespace WebApplication95 } } } - - public class EndPoint - { - private List _connections = new List(); - - public virtual async Task OnConnected(Connection connection) - { - lock (_connections) - { - _connections.Add(connection); - } - - // Echo server - while (true) - { - var input = await connection.Input.ReadAsync(); - try - { - if (input.IsEmpty && connection.Input.Reading.IsCompleted) - { - break; - } - - List connections = null; - lock (_connections) - { - connections = _connections; - } - - foreach (var c in connections) - { - var output = c.Output.Alloc(); - output.Append(ref input); - await output.FlushAsync(); - } - } - finally - { - connection.Input.Advance(input.End); - } - } - - lock (_connections) - { - _connections.Remove(connection); - } - } - } } diff --git a/src/WebApplication95/EndPoint.cs b/src/WebApplication95/EndPoint.cs new file mode 100644 index 0000000000..188d166160 --- /dev/null +++ b/src/WebApplication95/EndPoint.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Channels; + +namespace WebApplication95 +{ + /// + /// Represents an end point that multiple connections connect to. For HTTP, endpoints are URLs, for non HTTP it can be a TCP listener (or similar) + /// + public class EndPoint + { + // This is a stream based API, we might just want to change to a message based API or invent framing + // over this stream based API to do a message based API + public virtual Task OnConnected(Connection connection) + { + return Task.CompletedTask; + } + } +} diff --git a/src/WebApplication95/EndPoints/ChatEndPoint.cs b/src/WebApplication95/EndPoints/ChatEndPoint.cs new file mode 100644 index 0000000000..9ded2f2a87 --- /dev/null +++ b/src/WebApplication95/EndPoints/ChatEndPoint.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Channels; + +namespace WebApplication95.EndPoints +{ + public class ChatEndPoint : EndPoint + { + private Bus bus = new Bus(); + + public override async Task OnConnected(Connection connection) + { + using (bus.Subscribe(nameof(ChatEndPoint), message => OnMessage(message, connection))) + { + while (true) + { + var input = await connection.Input.ReadAsync(); + try + { + if (input.IsEmpty && connection.Input.Reading.IsCompleted) + { + break; + } + + await bus.Publish(nameof(ChatEndPoint), new Message() + { + Payload = input + }); + } + finally + { + connection.Input.Advance(input.End); + } + } + } + + connection.Input.CompleteReader(); + } + + private async Task OnMessage(Message message, Connection connection) + { + var buffer = connection.Output.Alloc(); + var payload = message.Payload; + buffer.Append(ref payload); + await buffer.FlushAsync(); + } + } + +} diff --git a/src/WebApplication95/IDispatcher.cs b/src/WebApplication95/IDispatcher.cs deleted file mode 100644 index 2beada37bb..0000000000 --- a/src/WebApplication95/IDispatcher.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace WebApplication95 -{ - public interface IDispatcher - { - void OnIncoming(ArraySegment data); - } -} \ No newline at end of file diff --git a/src/WebApplication95/LongPolling.cs b/src/WebApplication95/LongPolling.cs index 7a6da27114..be1bc6c217 100644 --- a/src/WebApplication95/LongPolling.cs +++ b/src/WebApplication95/LongPolling.cs @@ -75,8 +75,10 @@ namespace WebApplication95 if (memory.TryGetArray(out data)) { await Send(data); + // Advance the buffer one block of memory - _state.Connection.Output.Advance(buffer.Slice(memory.Length).Start); + buffer = buffer.Slice(memory.Length); + _state.Connection.Output.Advance(buffer.Start); break; } } diff --git a/src/WebApplication95/Routing/PrefixRoute.cs b/src/WebApplication95/Routing/PrefixRoute.cs new file mode 100644 index 0000000000..177f7a1af2 --- /dev/null +++ b/src/WebApplication95/Routing/PrefixRoute.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Routing; + +namespace WebApplication95.Routing +{ + public class PrefixRoute : IRouter + { + private readonly IRouteHandler _target; + private readonly string _prefix; + + public PrefixRoute(IRouteHandler target, string prefix) + { + _target = target; + + if (prefix == null) + { + prefix = "/"; + } + else if (prefix.Length > 0 && prefix[0] != '/') + { + // owin.RequestPath starts with a / + prefix = "/" + prefix; + } + + if (prefix.Length > 1 && prefix[prefix.Length - 1] == '/') + { + prefix = prefix.Substring(0, prefix.Length - 1); + } + + _prefix = prefix; + } + + public Task RouteAsync(RouteContext context) + { + var requestPath = context.HttpContext.Request.Path.Value ?? string.Empty; + if (requestPath.StartsWith(_prefix, StringComparison.OrdinalIgnoreCase)) + { + if (requestPath.Length > _prefix.Length) + { + var lastCharacter = requestPath[_prefix.Length]; + if (lastCharacter != '/' && lastCharacter != '#' && lastCharacter != '?') + { + return Task.FromResult(0); + } + } + + context.Handler = _target.GetRequestHandler(context.HttpContext, context.RouteData); + } + + return Task.FromResult(0); + } + + public VirtualPathData GetVirtualPath(VirtualPathContext context) + { + return null; + } + } +} diff --git a/src/WebApplication95/Routing/RouteBuilderExtensions.cs b/src/WebApplication95/Routing/RouteBuilderExtensions.cs new file mode 100644 index 0000000000..45d253efd8 --- /dev/null +++ b/src/WebApplication95/Routing/RouteBuilderExtensions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Routing; + +namespace WebApplication95.Routing +{ + + public static class RouteBuilderExtensions + { + public static IRouteBuilder AddPrefixRoute( + this IRouteBuilder routeBuilder, + string prefix, + IRouteHandler handler) + { + routeBuilder.Routes.Add(new PrefixRoute(handler, prefix)); + return routeBuilder; + } + } +} diff --git a/src/WebApplication95/Startup.cs b/src/WebApplication95/Startup.cs index 8ddf30b1fa..cabfd9d7cc 100644 --- a/src/WebApplication95/Startup.cs +++ b/src/WebApplication95/Startup.cs @@ -1,7 +1,10 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using WebApplication95.EndPoints; +using WebApplication95.Routing; namespace WebApplication95 { @@ -11,6 +14,9 @@ namespace WebApplication95 // For more information on how to configure your application, visit http://go.microsoft.com/fwlink/?LinkID=398940 public void ConfigureServices(IServiceCollection services) { + services.AddRouting(); + + services.AddSingleton(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -25,11 +31,9 @@ namespace WebApplication95 app.UseDeveloperExceptionPage(); } - var dispatcher = new Dispatcher(); - - app.Run(async (context) => + app.UseRealTimeConnections(d => { - await dispatcher.Execute(context); + d.MapEndPoint("/chat"); }); } } diff --git a/src/WebApplication95/project.json b/src/WebApplication95/project.json index 10ee59e745..241e3ec896 100644 --- a/src/WebApplication95/project.json +++ b/src/WebApplication95/project.json @@ -5,6 +5,7 @@ "version": "1.0.0", "type": "platform" }, + "Microsoft.AspNetCore.Routing": "1.0.0", "Microsoft.AspNetCore.Diagnostics": "1.0.0", "Microsoft.AspNetCore.StaticFiles": "1.0.0", diff --git a/src/WebApplication95/wwwroot/index.html b/src/WebApplication95/wwwroot/index.html index a956cdfb66..23d0c1e09f 100644 --- a/src/WebApplication95/wwwroot/index.html +++ b/src/WebApplication95/wwwroot/index.html @@ -11,7 +11,7 @@ var body = document.getElementById('data').value; var xhr = new XMLHttpRequest(); - var url = '/send?id=' + connectionId; + var url = '/chat/send?id=' + connectionId; xhr.open("POST", url, true); xhr.setRequestHeader('Content-type', 'application/json'); xhr.onreadystatechange = function () { @@ -22,7 +22,7 @@ xhr.send(data); } - var source = new EventSource('/sse'); + var source = new EventSource('/chat/sse'); source.onopen = function () { console.log('Opened!'); diff --git a/src/WebApplication95/wwwroot/polling.html b/src/WebApplication95/wwwroot/polling.html index fc75370f12..29a5e4b792 100644 --- a/src/WebApplication95/wwwroot/polling.html +++ b/src/WebApplication95/wwwroot/polling.html @@ -11,7 +11,7 @@ var body = document.getElementById('data').value; var xhr = new XMLHttpRequest(); - var url = '/send?id=' + connectionId; + var url = '/chat/send?id=' + connectionId; xhr.open("POST", url, true); xhr.setRequestHeader('Content-type', 'application/json'); xhr.onreadystatechange = function () { @@ -25,7 +25,7 @@ function poll(id) { var xhr = new XMLHttpRequest(); - var url = '/poll' + (id == null ? '' : '?id=' + id); + var url = '/chat/poll' + (id == null ? '' : '?id=' + id); xhr.open("POST", url, true); xhr.onreadystatechange = function () { if (xhr.readyState == 4 && xhr.status == 200) { From 32ed7ca0c4a532c6e87d782ebd694ba2e1859f28 Mon Sep 17 00:00:00 2001 From: moozzyk Date: Fri, 30 Sep 2016 14:21:37 -0700 Subject: [PATCH 03/99] SSE kind of works - duplicate messages --- src/WebApplication95/ConnectionManager.cs | 3 +- src/WebApplication95/Dispatcher.cs | 112 ++++++++++++++-------- src/WebApplication95/LongPolling.cs | 30 +----- src/WebApplication95/ServerSentEvents.cs | 4 +- src/WebApplication95/WebSockets.cs | 1 + src/WebApplication95/wwwroot/index.html | 94 +++++++++++------- 6 files changed, 141 insertions(+), 103 deletions(-) diff --git a/src/WebApplication95/ConnectionManager.cs b/src/WebApplication95/ConnectionManager.cs index 7acf181a93..a558bc0b82 100644 --- a/src/WebApplication95/ConnectionManager.cs +++ b/src/WebApplication95/ConnectionManager.cs @@ -45,9 +45,10 @@ namespace WebApplication95 } } + + // TODO: don't leak HttpContext to ConnectionManager public string GetConnectionId(HttpContext context) { - // REVIEW: Only check the query string for longpolling var id = context.Request.Query["id"]; if (!StringValues.IsNullOrEmpty(id)) diff --git a/src/WebApplication95/Dispatcher.cs b/src/WebApplication95/Dispatcher.cs index be14340db2..ca419e94ad 100644 --- a/src/WebApplication95/Dispatcher.cs +++ b/src/WebApplication95/Dispatcher.cs @@ -41,83 +41,117 @@ namespace WebApplication95 public async Task Execute(string path, HttpContext context) where TEndPoint : EndPoint { - if (context.Request.Path.StartsWithSegments(path + "/send")) + if (context.Request.Path.StartsWithSegments(path + "/getid")) { - var connectionId = context.Request.Query["id"]; - - if (StringValues.IsNullOrEmpty(connectionId)) - { - throw new InvalidOperationException("Missing connection id"); - } - - ConnectionState state; - if (_manager.TryGetConnection(connectionId, out state)) - { - // Write the message length - await context.Request.Body.CopyToAsync(state.Connection.Input); - } + await ProcessGetId(context); + } + else if (context.Request.Path.StartsWithSegments(path + "/send")) + { + await ProcessSend(context); } else { - var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); - var connectionId = _manager.GetConnectionId(context); + var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); // Outgoing channels if (context.Request.Path.StartsWithSegments(path + "/sse")) { - ConnectionState state; - _manager.AddConnection(connectionId, out state); + var connectionState = GetOrCreateConnection(context); + var sse = new ServerSentEvents(connectionState); - var sse = new ServerSentEvents(state); + var ignore = endpoint.OnConnected(connectionState.Connection); - var ignore = endpoint.OnConnected(state.Connection); - - state.Connection.TransportType = TransportType.ServerSentEvents; + connectionState.Connection.TransportType = TransportType.ServerSentEvents; await sse.ProcessRequest(context); - state.Connection.Complete(); + connectionState.Connection.Complete(); - _manager.RemoveConnection(connectionId); + _manager.RemoveConnection(connectionState.Connection.ConnectionId); } else if (context.Request.Path.StartsWithSegments(path + "/ws")) { - ConnectionState state; - _manager.AddConnection(connectionId, out state); + var connectionState = GetOrCreateConnection(context); + var ws = new WebSockets(connectionState); - var ws = new WebSockets(state); + var ignore = endpoint.OnConnected(connectionState.Connection); - var ignore = endpoint.OnConnected(state.Connection); - - state.Connection.TransportType = TransportType.WebSockets; + connectionState.Connection.TransportType = TransportType.WebSockets; await ws.ProcessRequest(context); - state.Connection.Complete(); + connectionState.Connection.Complete(); - _manager.RemoveConnection(connectionId); + _manager.RemoveConnection(connectionState.Connection.ConnectionId); } else if (context.Request.Path.StartsWithSegments(path + "/poll")) { - ConnectionState state; + var connectionId = context.Request.Query["id"]; + ConnectionState connectionState; bool newConnection = false; - if (_manager.AddConnection(connectionId, out state)) + if (_manager.AddConnection(connectionId, out connectionState)) { newConnection = true; - var ignore = endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(connectionState.Connection); - state.Connection.TransportType = TransportType.LongPolling; + connectionState.Connection.TransportType = TransportType.LongPolling; } - var longPolling = new LongPolling(state); + var longPolling = new LongPolling(connectionState); await longPolling.ProcessRequest(newConnection, context); - _manager.MarkConnectionDead(connectionId); + _manager.MarkConnectionDead(connectionState.Connection.ConnectionId); } - } } + + private async Task ProcessGetId(HttpContext context) + { + var connectionId = _manager.GetConnectionId(context); + ConnectionState state; + _manager.AddConnection(connectionId, out state); + context.Response.Headers["X-SignalR-ConnectionId"] = connectionId; + await context.Response.WriteAsync($"{{ \"connectionId\": \"{connectionId}\" }}"); + return; + } + + private async Task ProcessSend(HttpContext context) + { + var connectionId = context.Request.Query["id"]; + if (StringValues.IsNullOrEmpty(connectionId)) + { + throw new InvalidOperationException("Missing connection id"); + } + + ConnectionState state; + if (_manager.TryGetConnection(connectionId, out state)) + { + // Write the message length + await context.Request.Body.CopyToAsync(state.Connection.Input); + } + } + + private ConnectionState GetOrCreateConnection(HttpContext context) + { + var connectionId = context.Request.Query["id"]; + ConnectionState connectionState; + + if (StringValues.IsNullOrEmpty(connectionId)) + { + connectionId = _manager.GetConnectionId(context); + _manager.AddConnection(connectionId, out connectionState); + } + else + { + if (!_manager.TryGetConnection(connectionId, out connectionState)) + { + throw new InvalidOperationException("Unknown connection id"); + } + } + + return connectionState; + } } } diff --git a/src/WebApplication95/LongPolling.cs b/src/WebApplication95/LongPolling.cs index be1bc6c217..b47e9654ee 100644 --- a/src/WebApplication95/LongPolling.cs +++ b/src/WebApplication95/LongPolling.cs @@ -104,33 +104,9 @@ namespace WebApplication95 await Post(async state => { var data = ((ArraySegment)state); - // + 100 = laziness - var buffer = new byte[data.Count + _state.Connection.ConnectionId.Length + 100]; - var at = 0; - buffer[at++] = (byte)'{'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'c'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)':'; - buffer[at++] = (byte)'"'; - int count = Encoding.UTF8.GetBytes(_state.Connection.ConnectionId, 0, _state.Connection.ConnectionId.Length, buffer, at); - at += count; - buffer[at++] = (byte)'"'; - if (data.Array != null) - { - buffer[at++] = (byte)','; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'d'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)':'; - //buffer[at++] = (byte)'"'; - Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count); - } - at += data.Count; - //buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'}'; - _context.Response.ContentLength = at; - await _context.Response.Body.WriteAsync(buffer, 0, at); + _context.Response.Headers["X-SignalR-ConnectionId"] = _state.Connection.ConnectionId; + _context.Response.ContentLength = data.Count; + await _context.Response.Body.WriteAsync(data.Array, 0, data.Count); }, value); diff --git a/src/WebApplication95/ServerSentEvents.cs b/src/WebApplication95/ServerSentEvents.cs index 0aaf8287ee..d146e9ed54 100644 --- a/src/WebApplication95/ServerSentEvents.cs +++ b/src/WebApplication95/ServerSentEvents.cs @@ -40,14 +40,12 @@ namespace WebApplication95 public async Task ProcessRequest(HttpContext context) { context.Response.ContentType = "text/event-stream"; + context.Response.Headers["Cache-Control"] = "no-cache"; // End the connection if the client goes away context.RequestAborted.Register(state => OnConnectionAborted(state), this); - _context = context; - await _context.Response.WriteAsync($"data: {_state.Connection.ConnectionId}\n\n"); - // Set the initial TCS when everything is setup _initTcs.TrySetResult(null); diff --git a/src/WebApplication95/WebSockets.cs b/src/WebApplication95/WebSockets.cs index a9ab616d96..b4e57ee148 100644 --- a/src/WebApplication95/WebSockets.cs +++ b/src/WebApplication95/WebSockets.cs @@ -78,6 +78,7 @@ namespace WebApplication95 else if (result.MessageType == WebSocketMessageType.Close) { await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); + // TODO: needs to remove itself from connection mamanger? break; } } diff --git a/src/WebApplication95/wwwroot/index.html b/src/WebApplication95/wwwroot/index.html index 23d0c1e09f..20e02eaa7d 100644 --- a/src/WebApplication95/wwwroot/index.html +++ b/src/WebApplication95/wwwroot/index.html @@ -4,49 +4,77 @@

    Server Sent Events

    - +