// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.IO; using System.Net.Http; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.SignalR.Tests.Common; using Microsoft.AspNetCore.Sockets.Client; using Microsoft.AspNetCore.Sockets.Internal; using Moq; using Moq.Protected; using Xunit; namespace Microsoft.AspNetCore.SignalR.Client.Tests { public class ServerSentEventsTransportTests { [Fact] public async Task CanStartStopSSETransport() { var eventStreamTcs = new TaskCompletionSource(); var copyToAsyncTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(async (request, cancellationToken) => { await Task.Yield(); // Receive loop started - allow stopping the transport eventStreamTcs.SetResult(null); // returns unfinished task to block pipelines var mockStream = new Mock(); mockStream .Setup(s => s.CopyToAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns(copyToAsyncTcs.Task); return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) }; }); try { using (var httpClient = new HttpClient(mockHttpHandler.Object)) { var sseTransport = new ServerSentEventsTransport(httpClient); var connectionToTransport = Channel.CreateUnbounded(); var transportToConnection = Channel.CreateUnbounded(); var channelConnection = new ChannelConnection(connectionToTransport, transportToConnection); await sseTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection).OrTimeout(); await eventStreamTcs.Task.OrTimeout(); await sseTransport.StopAsync().OrTimeout(); await sseTransport.Running.OrTimeout(); } } finally { copyToAsyncTcs.SetResult(0); } } } }