From 4f6306f352afd1aa1dc39d5039cd9c27ab4bb652 Mon Sep 17 00:00:00 2001 From: moozzyk Date: Wed, 8 Feb 2017 17:04:48 -0800 Subject: [PATCH] Adding rasing Connected and Closed events --- .../Connection.cs | 23 ++- .../ConnectionTests.cs | 160 ++++++++++++++++++ 2 files changed, 182 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index 0c6a6ad859..1876b4e52e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -25,6 +25,10 @@ namespace Microsoft.AspNetCore.Sockets.Client public Uri Url { get; } + public event Action Connected; + public event Action Received; + public event Action Closed; + public Connection(Uri url) : this(url, null) { } @@ -64,6 +68,13 @@ namespace Microsoft.AspNetCore.Sockets.Client } Interlocked.Exchange(ref _connectionState, ConnectionState.Connected); + + // Do not "simplify" - events can be removed from a different thread + var connectedEventHandler = Connected; + if (connectedEventHandler != null) + { + connectedEventHandler(); + } } private static async Task GetConnectUrl(Uri url, HttpClient httpClient, ILogger logger) @@ -110,7 +121,17 @@ namespace Microsoft.AspNetCore.Sockets.Client _transportChannel = new ChannelConnection(applicationToTransport, transportToApplication); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Input.Completion.ContinueWith(t => Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected)); + Input.Completion.ContinueWith(t => + { + Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected); + + // Do not "simplify" - events can be removed from a different thread + var closedEventHandler = Closed; + if (closedEventHandler != null) + { + closedEventHandler(t.IsFaulted ? t.Exception.InnerException : null); + } + }); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed // Start the transport, giving it one end of the pipeline diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs index 38651dc50c..9dc0bad723 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs @@ -145,6 +145,166 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary)); } + [Fact] + public async Task ConnectedEventRaisedWhenTheClientIsConnected() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) + using (var connection = new Connection(new Uri("http://fakeuri.org/"))) + { + var connectedEventRaised = false; + connection.Connected += () => connectedEventRaised = true; + + await connection.StartAsync(longPollingTransport, httpClient); + + Assert.True(connectedEventRaised); + } + } + + [Fact] + public async Task ConnectedEventNotRaisedWhenTransportFailsToStart() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + var mockTransport = new Mock(); + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + .Returns(Task.FromException(new InvalidOperationException("Transport failed to start"))); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (var connection = new Connection(new Uri("http://fakeuri.org/"))) + { + var connectedEventRaised = false; + connection.Connected += () => connectedEventRaised = true; + + await Assert.ThrowsAsync( + async () => await connection.StartAsync(mockTransport.Object, httpClient)); + + Assert.False(connectedEventRaised); + } + } + + [Fact(Skip = "Need draining to make it work. Receive event may fix that.")] + public async Task ClosedEventRaisedWhenTheClientIsStopped() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) + using (var connection = new Connection(new Uri("http://fakeuri.org/"))) + { + var closedEventTcs = new TaskCompletionSource(); + connection.Closed += e => closedEventTcs.SetResult(e); + + await connection.StartAsync(longPollingTransport, httpClient); + await connection.StopAsync(); + + + Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task)); + // in case of clean disconnect error should be null + Assert.Null(await closedEventTcs.Task); + } + } + + [Fact(Skip = "Need draining to make it work. Receive event may fix that.")] + public async Task ClosedEventRaisedWhenTheClientIsDisposed() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) + { + var connection = new Connection(new Uri("http://fakeuri.org/")); + var closedEventTcs = new TaskCompletionSource(); + connection.Closed += e => closedEventTcs.TrySetResult(e); + + using (connection) + { + await connection.StartAsync(longPollingTransport, httpClient); + } + + Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task)); + Assert.Null(await closedEventTcs.Task); + } + } + + [Fact] + public async Task ClosedEventRaisedWhenConnectionToServerLost() + { + var allowPollTcs = new TaskCompletionSource(); + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + if (request.RequestUri.AbsolutePath.EndsWith("/poll")) + { + await allowPollTcs.Task; + return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; + } + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) + using (var connection = new Connection(new Uri("http://fakeuri.org/"))) + { + var closedEventTcs = new TaskCompletionSource(); + connection.Closed += e => closedEventTcs.TrySetResult(e); + await connection.StartAsync(longPollingTransport, httpClient); + + var receiveTask = connection.ReceiveAsync(new ReceiveData()); + allowPollTcs.TrySetResult(null); + await Assert.ThrowsAsync(async () => await receiveTask); + + Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task)); + Assert.IsType(await closedEventTcs.Task); + } + } + + [Fact] + public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted() + { + using (var connection = new Connection(new Uri("http://fakeuri.org/"))) + { + bool closedEventRaised = false; + connection.Closed += e => closedEventRaised = true; + + await connection.StopAsync(); + Assert.False(closedEventRaised); + } + } + [Fact] public async Task TransportIsStoppedWhenConnectionIsStopped() {