Adding rasing Connected and Closed events

This commit is contained in:
moozzyk 2017-02-08 17:04:48 -08:00
parent 966470d269
commit 4f6306f352
2 changed files with 182 additions and 1 deletions

View File

@ -25,6 +25,10 @@ namespace Microsoft.AspNetCore.Sockets.Client
public Uri Url { get; }
public event Action Connected;
public event Action<byte[], Format> Received;
public event Action<Exception> Closed;
public Connection(Uri url)
: this(url, null)
{ }
@ -64,6 +68,13 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
Interlocked.Exchange(ref _connectionState, ConnectionState.Connected);
// Do not "simplify" - events can be removed from a different thread
var connectedEventHandler = Connected;
if (connectedEventHandler != null)
{
connectedEventHandler();
}
}
private static async Task<Uri> GetConnectUrl(Uri url, HttpClient httpClient, ILogger logger)
@ -110,7 +121,17 @@ namespace Microsoft.AspNetCore.Sockets.Client
_transportChannel = new ChannelConnection<Message>(applicationToTransport, transportToApplication);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Input.Completion.ContinueWith(t => Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected));
Input.Completion.ContinueWith(t =>
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
// Do not "simplify" - events can be removed from a different thread
var closedEventHandler = Closed;
if (closedEventHandler != null)
{
closedEventHandler(t.IsFaulted ? t.Exception.InnerException : null);
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
// Start the transport, giving it one end of the pipeline

View File

@ -145,6 +145,166 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary));
}
[Fact]
public async Task ConnectedEventRaisedWhenTheClientIsConnected()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
await connection.StartAsync(longPollingTransport, httpClient);
Assert.True(connectedEventRaised);
}
}
[Fact]
public async Task ConnectedEventNotRaisedWhenTransportFailsToStart()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
var mockTransport = new Mock<ITransport>();
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<Message>>()))
.Returns(Task.FromException(new InvalidOperationException("Transport failed to start")));
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(mockTransport.Object, httpClient));
Assert.False(connectedEventRaised);
}
}
[Fact(Skip = "Need draining to make it work. Receive event may fix that.")]
public async Task ClosedEventRaisedWhenTheClientIsStopped()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.SetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
// in case of clean disconnect error should be null
Assert.Null(await closedEventTcs.Task);
}
}
[Fact(Skip = "Need draining to make it work. Receive event may fix that.")]
public async Task ClosedEventRaisedWhenTheClientIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.TrySetResult(e);
using (connection)
{
await connection.StartAsync(longPollingTransport, httpClient);
}
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
Assert.Null(await closedEventTcs.Task);
}
}
[Fact]
public async Task ClosedEventRaisedWhenConnectionToServerLost()
{
var allowPollTcs = new TaskCompletionSource<object>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (request.RequestUri.AbsolutePath.EndsWith("/poll"))
{
await allowPollTcs.Task;
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
var receiveTask = connection.ReceiveAsync(new ReceiveData());
allowPollTcs.TrySetResult(null);
await Assert.ThrowsAsync<HttpRequestException>(async () => await receiveTask);
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
Assert.IsType<HttpRequestException>(await closedEventTcs.Task);
}
}
[Fact]
public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
bool closedEventRaised = false;
connection.Closed += e => closedEventRaised = true;
await connection.StopAsync();
Assert.False(closedEventRaised);
}
}
[Fact]
public async Task TransportIsStoppedWhenConnectionIsStopped()
{