Check for actual start in SSE (#1681)

This commit is contained in:
Mikael Mengistu 2018-03-22 19:03:48 +00:00 committed by GitHub
parent b111c91cb0
commit b5c46f35b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 4 deletions

View File

@ -54,8 +54,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
Log.StartTransport(_logger, transferFormat);
var startTcs = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
var sendTask = SendUtils.SendMessages(url, _application, _httpClient, _httpOptions, _transportCts, _logger);
var receiveTask = OpenConnection(_application, url, _transportCts.Token);
var receiveTask = OpenConnection(_application, url, startTcs, _transportCts.Token);
Running = Task.WhenAll(sendTask, receiveTask).ContinueWith(t =>
{
@ -66,17 +67,30 @@ namespace Microsoft.AspNetCore.Sockets.Client
return t;
}).Unwrap();
return Task.CompletedTask;
return startTcs.Task;
}
private async Task OpenConnection(IDuplexPipe application, Uri url, CancellationToken cancellationToken)
private async Task OpenConnection(IDuplexPipe application, Uri url, TaskCompletionSource<object> startTcs, CancellationToken cancellationToken)
{
Log.StartReceive(_logger);
var request = new HttpRequestMessage(HttpMethod.Get, url);
SendUtils.PrepareHttpRequest(request, _httpOptions);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
HttpResponseMessage response;
try
{
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
startTcs.TrySetResult(null);
}
catch (Exception ex)
{
Log.TransportStopping(_logger);
startTcs.TrySetException(ex);
return;
}
using (var stream = await response.Content.ReadAsStreamAsync())
{

View File

@ -370,6 +370,59 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
}
[Fact]
public async Task SSEWontStartIfSuccessfulConnectionIsNotEstablished()
{
using (StartLog(out var loggerFactory))
{
var httpHandler = new TestHttpMessageHandler();
httpHandler.OnGet("/?id=00000000-0000-0000-0000-000000000000", (_, __) =>
{
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError));
});
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler));
await WithConnectionAsync(
CreateConnection(httpHandler, loggerFactory: loggerFactory, url: null, transport: sse),
async (connection, closed) =>
{
await Assert.ThrowsAsync<InvalidOperationException>(
() => connection.StartAsync(TransferFormat.Text).OrTimeout());
});
}
}
[Fact]
public async Task SSEWaitsForResponseToStart()
{
using (StartLog(out var loggerFactory))
{
var httpHandler = new TestHttpMessageHandler();
var connectResponseTcs = new TaskCompletionSource<object>();
httpHandler.OnGet("/?id=00000000-0000-0000-0000-000000000000", async (_, __) =>
{
await connectResponseTcs.Task;
return ResponseUtils.CreateResponse(HttpStatusCode.Accepted);
});
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler));
await WithConnectionAsync(
CreateConnection(httpHandler, loggerFactory: loggerFactory, url: null, transport: sse),
async (connection, closed) =>
{
var startTask = connection.StartAsync(TransferFormat.Text).OrTimeout();
Assert.False(connectResponseTcs.Task.IsCompleted);
Assert.False(startTask.IsCompleted);
connectResponseTcs.TrySetResult(null);
await startTask;
});
}
}
[Fact]
public async Task TransportIsStoppedWhenConnectionIsStopped()
{