Remove references to OnReaderCompleted and OnWriterCompleted (#13018)

This commit is contained in:
Mikael Mengistu 2019-08-09 19:04:33 -07:00 committed by GitHub
parent 09db3ebbae
commit 8ef285620c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 86 deletions

View File

@ -316,8 +316,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
await connection.Transport.Output.WriteAsync(new byte[] { 0x42 }).OrTimeout(); await connection.Transport.Output.WriteAsync(new byte[] { 0x42 }).OrTimeout();
// We should get the exception in the transport input completion. await Assert.ThrowsAsync<HttpRequestException>(async () => await connection.Transport.Input.ReadAsync());
await Assert.ThrowsAsync<HttpRequestException>(() => connection.Transport.Input.WaitForWriterToComplete()).OrTimeout();
}); });
} }
} }

View File

@ -367,10 +367,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connectionClosed = new TaskCompletionSource<object>(); var connectionClosed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(testConnection), async connection => await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{ {
#pragma warning disable 0618
// We're hooking the TestConnection shutting down here because the HubConnection one will be blocked on the lock
testConnection.Transport.Input.OnWriterCompleted((_, __) => testConnectionClosed.TrySetResult(null), null);
#pragma warning restore
connection.Closed += (e) => connection.Closed += (e) =>
{ {
connectionClosed.TrySetResult(null); connectionClosed.TrySetResult(null);
@ -385,7 +381,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
testConnection.CompleteFromTransport(); testConnection.CompleteFromTransport();
// Wait for the connection to close. // Wait for the connection to close.
await testConnectionClosed.Task.OrTimeout(); await testConnection.Transport.Input.CompleteAsync();
// The stop should be completed. // The stop should be completed.
await stopTask.OrTimeout(); await stopTask.OrTimeout();

View File

@ -52,15 +52,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var pair = DuplexPipe.CreateConnectionPair(options, options); var pair = DuplexPipe.CreateConnectionPair(options, options);
Application = pair.Application; Application = pair.Application;
Transport = pair.Transport; Transport = pair.Transport;
// TODO: Resolve this, for now we use Pipe which works
#pragma warning disable 0618
Application.Input.OnWriterCompleted((ex, _) =>
{
Application.Output.Complete();
},
null);
#pragma warning restore 0618
} }
public override ValueTask DisposeAsync() => DisposeCoreAsync(); public override ValueTask DisposeAsync() => DisposeCoreAsync();
@ -156,6 +147,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
} }
else if (result.IsCompleted) else if (result.IsCompleted)
{ {
await Application.Output.CompleteAsync();
return null; return null;
} }
} }

View File

@ -1,8 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#pragma warning disable CS0618 // TODO: Remove when we replace the events
using System; using System;
using System.Buffers; using System.Buffers;
using System.IO.Pipelines; using System.IO.Pipelines;
@ -100,11 +98,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var transportInputTcs = new TaskCompletionSource<object>(); var transportInputTcs = new TaskCompletionSource<object>();
var transportOutputTcs = new TaskCompletionSource<object>(); var transportOutputTcs = new TaskCompletionSource<object>();
connection.Transport.Input.OnWriterCompleted((_, __) => transportInputTcs.TrySetResult(null), null);
connection.Transport.Output.OnReaderCompleted((_, __) => transportOutputTcs.TrySetResult(null), null);
connection.Application.Input.OnWriterCompleted((_, __) => applicationInputTcs.TrySetResult(null), null);
connection.Application.Output.OnReaderCompleted((_, __) => applicationOutputTcs.TrySetResult(null), null);
try try
{ {
await connection.DisposeAsync(closeGracefully).OrTimeout(); await connection.DisposeAsync(closeGracefully).OrTimeout();
@ -114,7 +107,17 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
// Ignore the exception that bubbles out of the failing task // Ignore the exception that bubbles out of the failing task
} }
await Task.WhenAll(applicationInputTcs.Task, applicationOutputTcs.Task, transportInputTcs.Task, transportOutputTcs.Task).OrTimeout(); var result = await connection.Transport.Output.FlushAsync();
Assert.True(result.IsCompleted);
result = await connection.Application.Output.FlushAsync();
Assert.True(result.IsCompleted);
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await connection.Transport.Input.ReadAsync());
Assert.Equal("Reading is not allowed after reader was completed.", exception.Message);
exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await connection.Application.Input.ReadAsync());
Assert.Equal("Reading is not allowed after reader was completed.", exception.Message);
} }
} }
@ -342,16 +345,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var connection = connectionManager.CreateConnection(PipeOptions.Default, PipeOptions.Default); var connection = connectionManager.CreateConnection(PipeOptions.Default, PipeOptions.Default);
connection.Application.Output.OnReaderCompleted((error, state) =>
{
tcs.TrySetResult(null);
},
null);
appLifetime.StopApplication(); appLifetime.StopApplication();
// Connection should be disposed so this should complete immediately var result = await connection.Application.Output.FlushAsync();
await tcs.Task.OrTimeout(); Assert.True(result.IsCompleted);
} }
} }
@ -368,16 +365,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
var connection = connectionManager.CreateConnection(PipeOptions.Default, PipeOptions.Default); var connection = connectionManager.CreateConnection(PipeOptions.Default, PipeOptions.Default);
connection.Application.Output.OnReaderCompleted((error, state) =>
{
tcs.TrySetResult(null);
},
null);
appLifetime.StopApplication(); appLifetime.StopApplication();
// Connection should be disposed so this should complete immediately var result = await connection.Application.Output.FlushAsync();
await tcs.Task.OrTimeout(); Assert.True(result.IsCompleted);
} }
} }

View File

@ -1,46 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#pragma warning disable 0618 // TODO: Remove dependency on pipe events
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
public static class PipeCompletionExtensions
{
public static Task WaitForWriterToComplete(this PipeReader reader)
{
var tcs = new TaskCompletionSource<object>();
reader.OnWriterCompleted((ex, state) =>
{
if (ex != null)
{
((TaskCompletionSource<object>)state).TrySetException(ex);
}
else
{
((TaskCompletionSource<object>)state).TrySetResult(null);
}
}, tcs);
return tcs.Task;
}
public static Task WaitForReaderToComplete(this PipeWriter writer)
{
var tcs = new TaskCompletionSource<object>();
writer.OnReaderCompleted((ex, state) =>
{
if (ex != null)
{
((TaskCompletionSource<object>)state).TrySetException(ex);
}
else
{
((TaskCompletionSource<object>)state).TrySetResult(null);
}
}, tcs);
return tcs.Task;
}
}
}