Fix #324 by adding streaming support to the server (#461)

This commit is contained in:
Andrew Stanton-Nurse 2017-05-31 09:10:20 -07:00 committed by GitHub
parent 39fd6974f7
commit 263dd0e4fe
41 changed files with 1760 additions and 397 deletions

View File

@ -1,12 +1,17 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26510.0
VisualStudioVersion = 15.0.26526.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-5398-4884-87E4-B816698CDE65}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{83B2C3EB-A3D8-4E6F-9A3C-A380B005EF31}"
ProjectSection(SolutionItems) = preProject
build\common.props = build\common.props
build\dependencies.props = build\dependencies.props
build\Key.snk = build\Key.snk
NuGet.config = NuGet.config
build\repo.props = build\repo.props
build\repo.targets = build\repo.targets
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{C4BC9889-B49F-41B6-806B-F84941B2549B}"
@ -64,6 +69,8 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Common", "Common", "{6CEC3DC2-5B01-45A8-8F0D-8531315DA90B}"
ProjectSection(SolutionItems) = preProject
test\Common\ArrayOutput.cs = test\Common\ArrayOutput.cs
test\Common\ByteArrayExtensions.cs = test\Common\ByteArrayExtensions.cs
test\Common\ChannelExtensions.cs = test\Common\ChannelExtensions.cs
test\Common\TaskExtensions.cs = test\Common\TaskExtensions.cs
EndProjectSection
EndProject

View File

@ -13,6 +13,7 @@
<RuntimeFrameworkVersion Condition="'$(TargetFramework)'=='netcoreapp2.0'">2.0.0-*</RuntimeFrameworkVersion>
<TestSdkVersion>15.3.0-*</TestSdkVersion>
<XunitVersion>2.3.0-beta2-*</XunitVersion>
<RxVersion>3.1.1</RxVersion>
<!--
TODO remove in next update of xunit

View File

@ -2,150 +2,290 @@ import { IConnection } from "../Microsoft.AspNetCore.SignalR.Client.TS/IConnecti
import { HubConnection } from "../Microsoft.AspNetCore.SignalR.Client.TS/HubConnection"
import { DataReceived, ConnectionClosed } from "../Microsoft.AspNetCore.SignalR.Client.TS/Common"
import { TransportType, ITransport } from "../Microsoft.AspNetCore.SignalR.Client.TS/Transports"
import { Observer } from "../Microsoft.AspNetCore.SignalR.Client.TS/Observable"
import { asyncit as it, captureException } from './JasmineUtils';
describe("HubConnection", () => {
it("completes pending invocations when stopped", async done => {
let connection: IConnection = {
start(transportType: TransportType | ITransport): Promise<void> {
return Promise.resolve();
},
describe("invoke", () => {
it("sends an invocation", async () => {
let connection = new TestConnection();
send(data: any): Promise<void> {
return Promise.resolve();
},
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod", "arg", 42)
.catch((_) => { }); // Suppress exception and unhandled promise rejection warning.
stop(): void {
if (this.onClosed) {
this.onClosed();
}
},
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
type: 1,
invocationId: connection.lastInvocationId,
target: "testMethod",
nonblocking: false,
arguments: [
"arg",
42
]
});
onDataReceived: null,
onClosed: null
};
// Close the connection
hubConnection.stop();
});
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod");
hubConnection.stop();
it("rejects the promise when an error is received", async () => {
let connection = new TestConnection();
try {
await invokePromise;
fail();
}
catch (e) {
expect(e.message).toBe("Invocation cancelled due to connection being closed.");
}
done();
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
let ex = await captureException(async () => invokePromise);
expect(ex.message).toBe("foo");
});
it("resolves the promise when a result is received", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" });
expect(await invokePromise).toBe("foo");
});
it("completes pending invocations when stopped", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod");
hubConnection.stop();
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Invocation cancelled due to connection being closed.");
});
it("completes pending invocations when connection is lost", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod");
// Typically this would be called by the transport
connection.onClosed(new Error("Connection lost"));
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Connection lost");
});
it("rejects streaming responses made using 'invoke'", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod");
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: null });
connection.onClosed();
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Streaming methods must be invoked using HubConnection.stream");
});
});
it("completes pending invocations when connection is lost", async done => {
let connection: IConnection = {
start(transportType: TransportType | ITransport): Promise<void> {
return Promise.resolve();
},
describe("stream", () => {
it("sends an invocation", async () => {
let connection = new TestConnection();
send(data: any): Promise<void> {
return Promise.resolve();
},
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.stream("testStream", "arg", 42);
stop(): void {
if (this.onClosed) {
this.onClosed();
}
},
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
type: 1,
invocationId: connection.lastInvocationId,
target: "testStream",
nonblocking: false,
arguments: [
"arg",
42
]
});
onDataReceived: null,
onClosed: null
};
// Close the connection
hubConnection.stop();
});
let hubConnection = new HubConnection(connection);
var invokePromise = hubConnection.invoke("testMethod");
// Typically this would be called by the transport
connection.onClosed(new Error("Connection lost"));
it("completes with an error when an error is yielded", async () => {
let connection = new TestConnection();
try {
await invokePromise;
fail();
}
catch (e) {
expect(e.message).toBe("Connection lost");
}
done();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod", "arg", 42)
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: foo");
});
it("completes the observer when a completion is received", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod", "arg", 42)
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
expect(await observer.completed).toEqual([]);
});
it("completes pending streams when stopped", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
hubConnection.stop();
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: Invocation cancelled due to connection being closed.");
});
it("completes pending streams when connection is lost", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
// Typically this would be called by the transport
connection.onClosed(new Error("Connection lost"));
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: Connection lost");
});
it("rejects completion responses", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" });
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: Server provided a result in a completion response to a streamed invocation.");
});
it("yields items as they arrive", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 1 });
expect(observer.itemsReceived).toEqual([1]);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 2 });
expect(observer.itemsReceived).toEqual([1, 2]);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 3 });
expect(observer.itemsReceived).toEqual([1, 2, 3]);
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
expect(await observer.completed).toEqual([1, 2, 3]);
});
});
});
it("sends invocations as nonblocking", async done => {
let dataSent: string;
let connection: IConnection = {
start(transportType: TransportType): Promise<void> {
return Promise.resolve();
},
class TestConnection implements IConnection {
start(transportType: TransportType | ITransport): Promise<void> {
return Promise.resolve();
};
send(data: any): Promise<void> {
dataSent = data;
return Promise.resolve();
},
stop(): void {
if (this.onClosed) {
this.onClosed();
}
},
onDataReceived: null,
onClosed: null
};
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod");
expect(JSON.parse(dataSent).nonblocking).toBe(false);
// will clean pending promises
connection.onClosed();
try {
await invokePromise;
fail(); // exception is expected because the call has not completed
send(data: any): Promise<void> {
this.lastInvocationId = JSON.parse(data).invocationId;
if (this.sentData) {
this.sentData.push(data);
}
catch (e) {
else {
this.sentData = [data];
}
done();
});
return Promise.resolve();
};
it("rejects streaming responses", async done => {
let connection: IConnection = {
start(transportType: TransportType): Promise<void> {
return Promise.resolve();
},
send(data: any): Promise<void> {
return Promise.resolve();
},
stop(): void {
if (this.onClosed) {
this.onClosed();
}
},
onDataReceived: null,
onClosed: null
};
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod");
connection.onDataReceived("{ \"type\": 2, \"invocationId\": \"0\", \"result\": null }");
connection.onClosed();
try {
await invokePromise;
fail();
}
catch (e) {
expect(e.message).toBe("Streaming is not supported.");
stop(): void {
if (this.onClosed) {
this.onClosed();
}
};
done();
});
});
receive(data: any): void {
this.onDataReceived(JSON.stringify(data));
}
onDataReceived: DataReceived;
onClosed: ConnectionClosed;
sentData: [any];
lastInvocationId: string;
};
class TestObserver implements Observer<any>
{
public itemsReceived: [any];
private itemsSource: PromiseSource<[any]>;
get completed(): Promise<[any]> {
return this.itemsSource.promise;
}
constructor() {
this.itemsReceived = <[any]>[];
this.itemsSource = new PromiseSource<[any]>();
}
next(value: any) {
this.itemsReceived.push(value);
}
error(err: any) {
this.itemsSource.reject(new Error(err));
}
complete() {
this.itemsSource.resolve(this.itemsReceived);
}
};
class PromiseSource<T> {
public promise: Promise<T>
private resolver: (value?: T | PromiseLike<T>) => void;
private rejecter: (reason?: any) => void;
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolver = resolve;
this.rejecter = reject;
});
}
resolve(value?: T | PromiseLike<T>) {
this.resolver(value);
}
reject(reason?: any) {
this.rejecter(reason);
}
}

View File

@ -0,0 +1,21 @@
export function asyncit(expectation: string, assertion?: () => Promise<any>, timeout?: number): void {
let testFunction: (done: DoneFn) => void;
if (assertion) {
testFunction = done => {
assertion()
.then(() => done())
.catch(() => fail());
};
}
it(expectation, testFunction, timeout);
}
export async function captureException(fn: () => Promise<any>): Promise<Error> {
try {
await fn();
return null;
} catch (e) {
return e;
}
}

View File

@ -2,7 +2,7 @@ import { ConnectionClosed } from "./Common"
import { IConnection } from "./IConnection"
import { Connection } from "./Connection"
import { TransportType } from "./Transports"
import { Subject, Observable } from "./Observable"
const enum MessageType {
Invocation = 1,
@ -22,7 +22,7 @@ interface InvocationMessage extends HubMessage {
}
interface ResultMessage extends HubMessage {
readonly result?: any;
readonly item?: any;
}
interface CompletionMessage extends HubMessage {
@ -35,7 +35,7 @@ export { TransportType } from "./Transports"
export class HubConnection {
private connection: IConnection;
private callbacks: Map<string, (invocationUpdate: CompletionMessage|ResultMessage) => void>;
private callbacks: Map<string, (invocationUpdate: CompletionMessage | ResultMessage) => void>;
private methods: Map<string, (...args: any[]) => void>;
private id: number;
private connectionClosedCallback: ConnectionClosed;
@ -55,7 +55,7 @@ export class HubConnection {
this.onConnectionClosed(error);
}
this.callbacks = new Map<string, (invocationEvent: CompletionMessage|ResultMessage) => void>();
this.callbacks = new Map<string, (invocationEvent: CompletionMessage | ResultMessage) => void>();
this.methods = new Map<string, (...args: any[]) => void>();
this.id = 0;
}
@ -73,12 +73,14 @@ export class HubConnection {
this.InvokeClientMethod(<InvocationMessage>message);
break;
case MessageType.Result:
// TODO: Streaming (MessageType.Result) currently not supported - callback will throw
case MessageType.Completion:
let callback = this.callbacks.get(message.invocationId);
if (callback != null) {
callback(message);
this.callbacks.delete(message.invocationId);
if (message.type == MessageType.Completion) {
this.callbacks.delete(message.invocationId);
}
}
break;
default:
@ -125,17 +127,42 @@ export class HubConnection {
return this.connection.stop();
}
invoke(methodName: string, ...args: any[]): Promise<any> {
let id = this.id;
this.id++;
stream<T>(methodName: string, ...args: any[]): Observable<T> {
let invocationDescriptor = this.createInvocation(methodName, args);
let invocationDescriptor: InvocationMessage = {
type: MessageType.Invocation,
invocationId: id.toString(),
target: methodName,
arguments: args,
nonblocking: false
};
let subject = new Subject<T>();
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => {
if (invocationEvent.type === MessageType.Completion) {
let completionMessage = <CompletionMessage>invocationEvent;
if (completionMessage.error) {
subject.error(new Error(completionMessage.error));
}
else if(completionMessage.result) {
subject.error(new Error("Server provided a result in a completion response to a streamed invocation."));
}
else {
// TODO: Log a warning if there's a payload?
subject.complete();
}
}
else {
subject.next(<T>(<ResultMessage>invocationEvent).item);
}
});
//TODO: separate conversion to enable different data formats
this.connection.send(JSON.stringify(invocationDescriptor))
.catch(e => {
subject.error(e);
this.callbacks.delete(invocationDescriptor.invocationId);
});
return subject;
}
invoke(methodName: string, ...args: any[]): Promise<any> {
let invocationDescriptor = this.createInvocation(methodName, args);
let p = new Promise<any>((resolve, reject) => {
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => {
@ -149,7 +176,7 @@ export class HubConnection {
}
}
else {
reject(new Error("Streaming is not supported."))
reject(new Error("Streaming methods must be invoked using HubConnection.stream"))
}
});
@ -171,4 +198,17 @@ export class HubConnection {
set onClosed(callback: ConnectionClosed) {
this.connectionClosedCallback = callback;
}
private createInvocation(methodName: string, args: any[]): InvocationMessage {
let id = this.id;
this.id++;
return <InvocationMessage>{
type: MessageType.Invocation,
invocationId: id.toString(),
target: methodName,
arguments: args,
nonblocking: false
};
}
}

View File

@ -13,6 +13,7 @@
<ItemGroup>
<Inputs Include="*.ts;" />
<Inputs Remove="Observable.ts" />
<Outputs Include="@(Inputs -> '$(SignalRClientDistFolder)src\%(FileName).d.ts')" />
<Outputs Include="@(Inputs -> '$(SignalRClientDistFolder)src\%(FileName).js')" />
<Outputs Include="$(SignalRClientDistFolder)browser\signalr-client.js" />

View File

@ -0,0 +1,43 @@
// TODO: Seamless RxJs integration
// From RxJs: https://github.com/ReactiveX/rxjs/blob/master/src/Observer.ts
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
export interface Observable<T> {
// TODO: Return a Subscription so the caller can unsubscribe? IDisposable in System.IObservable
subscribe(observer: Observer<T>): void;
}
export class Subject<T> implements Observable<T> {
observers: Observer<T>[];
constructor() {
this.observers = [];
}
public next(item: T): void {
for (let observer of this.observers) {
observer.next(item);
}
}
public error(err: any): void {
for (let observer of this.observers) {
observer.error(err);
}
}
public complete(): void {
for (let observer of this.observers) {
observer.complete();
}
}
public subscribe(observer: Observer<T>): void {
this.observers.push(observer);
}
}

View File

@ -26,4 +26,4 @@ gulp.task('browserify-client', ['compile-ts-client'], () => {
gulp.task('build-ts-client', ['clean', 'compile-ts-client', 'browserify-client']);
gulp.task('default', ['build-ts-client']);
gulp.task('default', ['build-ts-client']);

View File

@ -15,6 +15,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="$(AspNetCoreVersion)" />
<PackageReference Include="System.Reactive.Linq" Version="$(RxVersion)" />
</ItemGroup>
<!-- these targets relies on npm packages being restored when building the TS client -->

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -16,6 +16,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
public static void Main(string[] args)
{
var host = new WebHostBuilder()
.UseSetting(WebHostDefaults.PreventHostingStartupKey, "true") // Work around https://github.com/aspnet/Hosting/issues/1075
.ConfigureLogging(factory =>
{
factory.AddConsole();

View File

@ -4,7 +4,6 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.SignalR.Test.Server
{

View File

@ -1,7 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Test.Server
@ -22,5 +24,10 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
{
return Clients.Client(Context.Connection.ConnectionId).InvokeAsync("Message", message);
}
public IObservable<string> Stream()
{
return new string[] { "a", "b", "c" }.ToObservable();
}
}
}

View File

@ -1,4 +1,4 @@
<!DOCTYPE html>
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
@ -16,4 +16,4 @@
</head>
<body>
</body>
</html>
</html>

View File

@ -2,85 +2,145 @@ const TESTHUBENDPOINT_URL = `http://${document.location.host}/testhub`;
describe('hubConnection', () => {
eachTransport(transportType => {
it(`over ${signalR.TransportType[transportType]} can invoke server method and receive result`, done => {
const message = "Hi";
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.onClosed = error => {
expect(error).toBe(undefined);
done();
}
describe(`${signalR.TransportType[transportType]} transport`, () => {
it(`can invoke server method and receive result`, done => {
const message = "Hi";
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.onClosed = error => {
expect(error).toBe(undefined);
done();
}
hubConnection.start(transportType)
.then(() => {
hubConnection.invoke('Echo', message)
.then(result => {
expect(result).toBe(message);
})
.catch(() => {
fail();
})
hubConnection.start(transportType)
.then(() => {
hubConnection.stop();
hubConnection.invoke('Echo', message)
.then(result => {
expect(result).toBe(message);
})
.catch(e => {
fail(e);
})
.then(() => {
hubConnection.stop();
})
})
})
.catch(() => {
fail();
done();
});
});
it(`over ${signalR.TransportType[transportType]} rethrows an exception from the server`, done => {
const errorMessage = "An error occurred.";
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.start(transportType)
.then(() => {
hubConnection.invoke('ThrowException', errorMessage)
.then(() => {
// exception expected but none thrown
fail();
})
.catch(e => {
expect(e.message).toBe(errorMessage);
})
.then(() => {
return hubConnection.stop();
})
.then(() => {
done();
});
})
.catch(() => {
fail();
done();
});
});
it(`over ${signalR.TransportType[transportType]} can receive server calls`, done => {
let client = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
const message = "Hello SignalR";
let callbackPromise = new Promise((resolve, reject) => {
client.on("Message", msg => {
expect(msg).toBe(message);
resolve();
});
.catch(e => {
fail(e);
done();
});
});
client.start(transportType)
.then(() => {
return Promise.all([client.invoke('InvokeWithString', message), callbackPromise]);
})
.then(() => {
return stop();
})
.then(() => {
done();
})
.catch(e => {
fail();
it(`can stream server method and receive result`, done => {
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.onClosed = error => {
expect(error).toBe(undefined);
done();
}
let received = [];
hubConnection.start(transportType)
.then(() => {
hubConnection.stream('Stream')
.subscribe({
next: (item) => {
received.push(item);
},
error: (err) => {
fail(err);
done();
},
complete: () => {
expect(received).toEqual(["a", "b", "c"]);
done();
}
});
})
.catch(e => {
fail(e);
done();
});
});
it(`rethrows an exception from the server when invoking`, done => {
const errorMessage = "An error occurred.";
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.start(transportType)
.then(() => {
hubConnection.invoke('ThrowException', errorMessage)
.then(() => {
// exception expected but none thrown
fail();
})
.catch(e => {
expect(e.message).toBe(errorMessage);
})
.then(() => {
return hubConnection.stop();
})
.then(() => {
done();
});
})
.catch(e => {
fail(e);
done();
});
});
it(`rethrows an exception from the server when streaming`, done => {
const errorMessage = "An error occurred.";
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
hubConnection.start(transportType)
.then(() => {
hubConnection.stream('ThrowException', errorMessage)
.subscribe({
next: (item) => {
fail();
},
error: (err) => {
expect(err.message).toEqual("An error occurred.");
done();
},
complete: () => {
fail();
}
});
})
.catch(e => {
fail(e);
done();
});
});
it(`can receive server calls`, done => {
let client = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
const message = "Hello SignalR";
let callbackPromise = new Promise((resolve, reject) => {
client.on("Message", msg => {
expect(msg).toBe(message);
resolve();
});
});
client.start(transportType)
.then(() => {
return Promise.all([client.invoke('InvokeWithString', message), callbackPromise]);
})
.then(() => {
return stop();
})
.then(() => {
done();
})
.catch(e => {
fail(e);
done();
});
});
});
});
});
});

View File

@ -1,18 +1,13 @@
describe('WebSockets', function () {
describe('WebSockets', function () {
it('can be used to connect to SignalR', done => {
const message = "message";
let webSocket = new WebSocket(ECHOENDPOINT_URL.replace(/^http/, "ws") + '/ws');
let webSocket = new WebSocket(ECHOENDPOINT_URL.replace(/^http/, "ws"));
webSocket.onopen = () => {
webSocket.send(message);
};
webSocket.onerror = event => {
fail();
done();
};
var received = "";
webSocket.onmessage = event => {
received += event.data;
@ -22,8 +17,14 @@
};
webSocket.onclose = event => {
if (!event.wasClean) {
fail("connection closed with unexpected status code: " + event.code + " " + event.reason);
}
// Jasmine doesn't like tests without expectations
expect(event.wasClean).toBe(true);
done();
};
});
});
});

View File

@ -0,0 +1,79 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR;
namespace SocketsSample.Hubs
{
public class Streaming : Hub
{
public IObservable<int> ObservableCounter(int count, int delay)
{
return new CounterObservable(count, delay);
}
public ReadableChannel<int> ChannelCounter(int count, int delay)
{
var channel = Channel.CreateUnbounded<int>();
Task.Run(async () =>
{
for (var i = 0; i < count; i++)
{
await channel.Out.WriteAsync(i);
await Task.Delay(delay);
}
channel.Out.TryComplete();
});
return channel.In;
}
private class CounterObservable : IObservable<int>
{
private int _count;
private int _delay;
public CounterObservable(int count, int delay)
{
_count = count;
_delay = delay;
}
public IDisposable Subscribe(IObserver<int> observer)
{
// Run in a thread-pool thread
var cts = new CancellationTokenSource();
Task.Run(async () =>
{
for (var i = 0; !cts.Token.IsCancellationRequested && i < _count; i++)
{
observer.OnNext(i);
await Task.Delay(_delay);
}
observer.OnCompleted();
});
return new Disposable(() => cts.Cancel());
}
}
private class Disposable : IDisposable
{
private Action _action;
public Disposable(Action action)
{
_action = action;
}
public void Dispose()
{
_action();
}
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Builder;
@ -49,6 +49,7 @@ namespace SocketsSample
app.UseSignalR(routes =>
{
routes.MapHub<Chat>("hubs");
routes.MapHub<Streaming>("streaming");
});
app.UseSockets(routes =>

View File

@ -1,4 +1,4 @@
<!DOCTYPE html>
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
@ -55,13 +55,6 @@
<script src="utils.js"></script>
<script>
var isConnected = false;
function click(id, callback) {
document.getElementById(id).addEventListener('click', event => {
callback(event);
event.preventDefault();
});
}
function invoke(connection, method, ...args) {
if (!isConnected) {
return;
@ -72,11 +65,11 @@ function invoke(connection, method, ...args) {
console.log("invocation completed successfully: " + (result === null ? '(null)' : result));
if (result) {
addLine(result);
addLine('message', result);
}
})
.catch(err => {
addLine(err, 'red');
addLine('message', err, 'red');
});
}
@ -84,30 +77,21 @@ function getText(id) {
return document.getElementById(id).value;
}
function addLine(line, color) {
var child = document.createElement('li');
if (color) {
child.style.color = color;
}
child.innerText = line;
document.getElementById('messages').appendChild(child);
}
let transportType = signalR.TransportType[getParameterByName('transport')] || signalR.TransportType.WebSockets;
document.getElementById('head1').innerHTML = signalR.TransportType[transportType];
let connection = new signalR.HubConnection(`http://${document.location.host}/hubs`, 'formatType=json&format=text');
connection.on('Send', msg => {
addLine(msg);
addLine('message', msg);
});
connection.onClosed = e => {
if (e) {
addLine('Connection closed with error: ' + e, 'red');
addLine('message', 'Connection closed with error: ' + e, 'red');
}
else {
addLine('Disconnected', 'green');
addLine('message', 'Disconnected', 'green');
}
}
@ -115,10 +99,10 @@ click('connect', event => {
connection.start(transportType)
.then(() => {
isConnected = true;
addLine('Connected successfully', 'green');
addLine('message', 'Connected successfully', 'green');
})
.catch(err => {
addLine(err, 'red');
addLine('message', err, 'red');
});
});
@ -155,4 +139,4 @@ click('send', event => {
invoke(connection, 'Echo', data);
});
</script>
</script>

View File

@ -1,22 +1,28 @@
<!DOCTYPE html>
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<h1>ASP.NET Sockets</h1>
<h1>ASP.NET Core Sockets</h1>
<ul>
<li><a href="sockets.html?transport=LongPolling">Long polling</a></li>
<li><a href="sockets.html?transport=ServerSentEvents">Server Sent Events</a></li>
<li><a href="sockets.html?transport=WebSockets">Web Sockets</a></li>
<li><a href="ws.html">Web Sockets (using only Browser APIs)</a></li>
</ul>
<h1>ASP.NET SignalR (Hubs)</h1>
<h1>ASP.NET Core SignalR (Hubs)</h1>
<ul>
<li><a href="hubs.html?transport=LongPolling">Long polling</a></li>
<li><a href="hubs.html?transport=ServerSentEvents">Server Sent Events</a></li>
<li><a href="hubs.html?transport=WebSockets">Web Sockets</a></li>
</ul>
<h1>ASP.NET Core SignalR (Streaming)</h1>
<ul>
<li><a href="streaming.html?transport=LongPolling">Long polling</a></li>
<li><a href="streaming.html?transport=ServerSentEvents">Server Sent Events</a></li>
<li><a href="streaming.html?transport=WebSockets">Web Sockets</a></li>
</ul>
</body>
</html>

View File

@ -0,0 +1,105 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<h1 id="transportName">Unknown Transport</h1>
<h2>Controls</h2>
<div>
<button id="connectButton" type="button">Connect</button>
<button id="disconnectButton" type="button" disabled>Disconnect</button>
<button id="clearButton" type="button">Clear</button>
</div>
<div>
<button id="observableButton" name="observable" type="button" disabled>From Observable</button>
<button id="channelButton" name="channel" type="button" disabled>From Channel</button>
</div>
<h2>Results</h2>
<ul id="resultsList"></ul>
<ul id="messages"></ul>
<script src="lib/signalr-client/signalr-client.js"></script>
<script src="utils.js"></script>
<script>
document.addEventListener('DOMContentLoaded', () => {
let resultsList = document.getElementById('resultsList');
let channelButton = document.getElementById('channelButton');
let observableButton = document.getElementById('observableButton');
let clearButton = document.getElementById('clearButton');
let connectButton = document.getElementById('connectButton');
let disconnectButton = document.getElementById('disconnectButton');
let transportType = signalR.TransportType[getParameterByName('transport')] || signalR.TransportType.WebSockets;
let invocationCounter = 0;
document.getElementById('transportName').innerHTML = signalR.TransportType[transportType];
let url = `http://${document.location.host}/streaming`
let connection = null;
click('clearButton', function () {
resultsList.innerHTML = '';
});
click('disconnectButton', function () {
connection.stop();
});
click('connectButton', function () {
connection = new signalR.HubConnection(url, 'formatType=json&format=text');
connection.onClosed = function () {
channelButton.disabled = true;
observableButton.disabled = true;
connectButton.disabled = false;
disconnectButton.disabled = true;
addLine('resultsList', 'disconnected', 'green');
};
connection.start(transportType)
.then(function () {
channelButton.disabled = false;
observableButton.disabled = false;
connectButton.disabled = true;
disconnectButton.disabled = false;
addLine('resultsList', 'connected', 'green');
});
});
click('observableButton', function () {
run('ObservableCounter')
});
click('channelButton', function () {
run('ChannelCounter')
});
function run(method) {
let id = invocationCounter;
invocationCounter += 1;
addLine('resultsList', `running ${method}(${id}) ...`);
connection.stream(method, 10, (Math.random() * 5) * 200).subscribe({
closed: false,
next: function (item) {
addLine('resultsList', `${method}(${id}): ${item}`);
},
error: function (err) {
addLine('resultsList', `${method}(${id}): ${err}`, 'red');
},
complete: function () {
addLine('resultsList', `${method}(${id}): complete`, 'green');
}
});
}
});
</script>
</body>
</html>

View File

@ -10,3 +10,19 @@ function getParameterByName(name, url) {
return decodeURIComponent(results[2].replace(/\+/g, " "));
}
function click(id, callback) {
document.getElementById(id).addEventListener('click', event => {
callback(event);
event.preventDefault();
});
}
function addLine(listId, line, color) {
var child = document.createElement('li');
if (color) {
child.style.color = color;
}
child.innerText = line;
document.getElementById(listId).appendChild(child);
}

View File

@ -0,0 +1,54 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.SignalR.Client
{
internal class CastObservable<TResult> : IObservable<TResult>
{
private IObservable<object> _innerObservable;
public CastObservable(IObservable<object> innerObservable)
{
_innerObservable = innerObservable;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
return _innerObservable.Subscribe(new CastObserver(observer));
}
private class CastObserver : IObserver<object>
{
private IObserver<TResult> _innerObserver;
public CastObserver(IObserver<TResult> innerObserver)
{
_innerObserver = innerObserver;
}
public void OnCompleted()
{
_innerObserver.OnCompleted();
}
public void OnError(Exception error)
{
_innerObserver.OnError(error);
}
public void OnNext(object value)
{
try
{
_innerObserver.OnNext((TResult)value);
}
catch(Exception ex)
{
_innerObserver.OnError(ex);
}
}
}
}
}

View File

@ -8,6 +8,7 @@ using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.Sockets;
@ -115,17 +116,30 @@ namespace Microsoft.AspNetCore.SignalR.Client
_handlers.AddOrUpdate(methodName, invocationHandler, (_, __) => invocationHandler);
}
public async Task<object> Invoke(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
public ReadableChannel<object> Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
{
var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var channel);
InvokeCore(methodName, irq, args);
return channel;
}
public Task<object> Invoke(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
{
var irq = InvocationRequest.Invoke(cancellationToken, returnType, GetNextId(), _loggerFactory, out var task);
InvokeCore(methodName, irq, args);
return task;
}
private void InvokeCore(string methodName, InvocationRequest irq, object[] args)
{
ThrowIfConnectionTerminated();
_logger.LogTrace("Preparing invocation of '{target}', with return type '{returnType}' and {argumentCount} args", methodName, returnType.AssemblyQualifiedName, args.Length);
_logger.LogTrace("Preparing invocation of '{target}', with return type '{returnType}' and {argumentCount} args", methodName, irq.ResultType.AssemblyQualifiedName, args.Length);
// Create an invocation descriptor. Client invocations are always blocking
var invocationMessage = new InvocationMessage(GetNextId(), nonBlocking: false, target: methodName, arguments: args);
var invocationMessage = new InvocationMessage(irq.InvocationId, nonBlocking: false, target: methodName, arguments: args);
// I just want an excuse to use 'irq' as a variable name...
_logger.LogDebug("Registering Invocation ID '{invocationId}' for tracking", invocationMessage.InvocationId);
var irq = new InvocationRequest(cancellationToken, returnType, invocationMessage.InvocationId, _loggerFactory);
AddInvocation(irq);
@ -133,16 +147,22 @@ namespace Microsoft.AspNetCore.SignalR.Client
if (_logger.IsEnabled(LogLevel.Trace))
{
var argsList = string.Join(", ", args.Select(a => a.GetType().FullName));
_logger.LogTrace("Issuing Invocation '{invocationId}': {returnType} {methodName}({args})", invocationMessage.InvocationId, returnType.FullName, methodName, argsList);
_logger.LogTrace("Issuing Invocation '{invocationId}': {returnType} {methodName}({args})", invocationMessage.InvocationId, irq.ResultType.FullName, methodName, argsList);
}
// We don't need to wait for this to complete. It will signal back to the invocation request.
_ = SendInvocation(invocationMessage, irq);
}
private async Task SendInvocation(InvocationMessage invocationMessage, InvocationRequest irq)
{
try
{
var payload = await _protocol.WriteToArrayAsync(invocationMessage);
_logger.LogInformation("Sending Invocation '{invocationId}'", invocationMessage.InvocationId);
await _connection.SendAsync(payload, _protocol.MessageType, cancellationToken);
await _connection.SendAsync(payload, _protocol.MessageType, irq.CancellationToken);
_logger.LogInformation("Sending Invocation '{invocationId}' complete", invocationMessage.InvocationId);
}
catch (Exception ex)
@ -151,9 +171,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
irq.Fail(ex);
TryRemoveInvocation(invocationMessage.InvocationId, out _);
}
// Return the completion task. It will be completed by ReceiveMessages when the response is received.
return await irq.Completion;
}
private void OnDataReceived(byte[] data, MessageType messageType)
@ -182,12 +199,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
break;
case StreamItemMessage streamItem:
// Complete the invocation with an error, we don't support streaming (yet)
if (!TryRemoveInvocation(streamItem.InvocationId, out irq))
if (!TryGetInvocation(streamItem.InvocationId, out irq))
{
_logger.LogWarning("Dropped unsolicited Stream Item message for invocation '{invocationId}'", streamItem.InvocationId);
return;
}
irq.Fail(new NotSupportedException("Streaming method results are not supported"));
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
default:
throw new InvalidOperationException($"Unknown message type: {message.GetType().FullName}");
@ -236,6 +253,22 @@ namespace Microsoft.AspNetCore.SignalR.Client
handler.Handler(invocation.Arguments);
}
// This async void is GROSS but we need to dispatch asynchronously because we're writing to a Channel
// and there's nobody to actually wait for us to finish.
private async void DispatchInvocationStreamItemAsync(StreamItemMessage streamItem, InvocationRequest irq)
{
_logger.LogTrace("Received StreamItem for Invocation #{invocationId}", streamItem.InvocationId);
if (irq.CancellationToken.IsCancellationRequested)
{
_logger.LogTrace("Canceling dispatch of StreamItem message for Invocation {invocationId}. The invocation was cancelled.", irq.InvocationId);
}
else if (!await irq.StreamItem(streamItem.Item))
{
_logger.LogWarning("Invocation {invocationId} received stream item after channel was closed.", irq.InvocationId);
}
}
private void DispatchInvocationCompletion(CompletionMessage completion, InvocationRequest irq)
{
_logger.LogTrace("Received Completion for Invocation #{invocationId}", completion.InvocationId);
@ -352,53 +385,5 @@ namespace Microsoft.AspNetCore.SignalR.Client
ParameterTypes = parameterTypes;
}
}
private class InvocationRequest : IDisposable
{
private readonly TaskCompletionSource<object> _completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
private readonly ILogger _logger;
public Type ResultType { get; }
public CancellationToken CancellationToken { get; }
public string InvocationId { get; }
public Task<object> Completion => _completionSource.Task;
public InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<InvocationRequest>();
_cancellationTokenRegistration = cancellationToken.Register(() => _completionSource.TrySetCanceled());
InvocationId = invocationId;
CancellationToken = cancellationToken;
ResultType = resultType;
_logger.LogTrace("Invocation {invocationId} created", InvocationId);
}
public void Fail(Exception exception)
{
_logger.LogTrace("Invocation {invocationId} marked as failed", InvocationId);
_completionSource.TrySetException(exception);
}
public void Complete(object result)
{
_logger.LogTrace("Invocation {invocationId} marked as completed", InvocationId);
_completionSource.TrySetResult(result);
}
public void Dispose()
{
_logger.LogTrace("Invocation {invocationId} disposed", InvocationId);
// Just in case it hasn't already been completed
_completionSource.TrySetCanceled();
_cancellationTokenRegistration.Dispose();
}
}
}
}

View File

@ -1,9 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
namespace Microsoft.AspNetCore.SignalR.Client
{
@ -35,6 +36,59 @@ namespace Microsoft.AspNetCore.SignalR.Client
return (TResult)await hubConnection.Invoke(methodName, typeof(TResult), cancellationToken, args);
}
public static ReadableChannel<TResult> Stream<TResult>(this HubConnection hubConnection, string methodName, params object[] args) =>
Stream<TResult>(hubConnection, methodName, CancellationToken.None, args);
public static ReadableChannel<TResult> Stream<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken, params object[] args)
{
if (hubConnection == null)
{
throw new ArgumentNullException(nameof(hubConnection));
}
var inputChannel = hubConnection.Stream(methodName, typeof(TResult), cancellationToken, args);
var outputChannel = Channel.CreateUnbounded<TResult>();
// Local function to provide a way to run async code as fire-and-forget
// The output channel is how we signal completion to the caller.
async Task RunChannel()
{
try
{
while (await inputChannel.WaitToReadAsync())
{
while (inputChannel.TryRead(out var item))
{
while (!outputChannel.Out.TryWrite((TResult)item))
{
if (!await outputChannel.Out.WaitToWriteAsync())
{
// Failed to write to the output channel because it was closed. Nothing really we can do but abort here.
return;
}
}
}
}
// Manifest any errors in the completion task
await inputChannel.Completion;
}
catch (Exception ex)
{
outputChannel.Out.TryComplete(ex);
}
finally
{
// This will safely no-op if the catch block above ran.
outputChannel.Out.TryComplete();
}
}
_ = RunChannel();
return outputChannel.In;
}
public static void On(this HubConnection hubConnection, string methodName, Action handler)
{
if (hubConnection == null)

View File

@ -0,0 +1,160 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.SignalR.Client
{
internal abstract class InvocationRequest : IDisposable
{
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
protected ILogger Logger { get; }
public Type ResultType { get; }
public CancellationToken CancellationToken { get; }
public string InvocationId { get; }
protected InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILogger logger)
{
_cancellationTokenRegistration = cancellationToken.Register(self => ((InvocationRequest)self).Cancel(), this);
InvocationId = invocationId;
CancellationToken = cancellationToken;
ResultType = resultType;
Logger = logger;
Logger.LogTrace("Invocation {invocationId} created", InvocationId);
}
public static InvocationRequest Invoke(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out Task<object> result)
{
var req = new NonStreaming(cancellationToken, resultType, invocationId, loggerFactory);
result = req.Result;
return req;
}
public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out ReadableChannel<object> result)
{
var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory);
result = req.Result;
return req;
}
public abstract void Fail(Exception exception);
public abstract void Complete(object result);
public abstract ValueTask<bool> StreamItem(object item);
protected abstract void Cancel();
public virtual void Dispose()
{
Logger.LogTrace("Invocation {invocationId} disposed", InvocationId);
// Just in case it hasn't already been completed
Cancel();
_cancellationTokenRegistration.Dispose();
}
private class Streaming : InvocationRequest
{
private readonly Channel<object> _channel = Channel.CreateUnbounded<object>();
public Streaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<Streaming>())
{
}
public ReadableChannel<object> Result => _channel.In;
public override void Complete(object result)
{
Logger.LogTrace("Invocation {invocationId} marked as completed.", InvocationId);
if (result != null)
{
Logger.LogError("Invocation {invocationId} received a completion result, but was invoked as a streaming invocation.", InvocationId);
_channel.Out.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
}
else
{
_channel.Out.TryComplete();
}
}
public override void Fail(Exception exception)
{
Logger.LogTrace("Invocation {invocationId} marked as failed.", InvocationId);
_channel.Out.TryComplete(exception);
}
public override async ValueTask<bool> StreamItem(object item)
{
try
{
Logger.LogTrace("Invocation {invocationId} received stream item.", InvocationId);
while (!_channel.Out.TryWrite(item))
{
if (!await _channel.Out.WaitToWriteAsync())
{
return false;
}
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Invocation {invocationId} caused an error trying to write a stream item.", InvocationId);
}
return true;
}
protected override void Cancel()
{
_channel.Out.TryComplete(new OperationCanceledException("Connection terminated"));
}
}
private class NonStreaming : InvocationRequest
{
private readonly TaskCompletionSource<object> _completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
public NonStreaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<NonStreaming>())
{
}
public Task<object> Result => _completionSource.Task;
public override void Complete(object result)
{
Logger.LogTrace("Invocation {invocationId} marked as completed.", InvocationId);
_completionSource.TrySetResult(result);
}
public override void Fail(Exception exception)
{
Logger.LogTrace("Invocation {invocationId} marked as failed.", InvocationId);
_completionSource.TrySetException(exception);
}
public override ValueTask<bool> StreamItem(object item)
{
Logger.LogError("Invocation {invocationId} received stream item but was invoked as a non-streamed invocation.", InvocationId);
_completionSource.TrySetException(new InvalidOperationException("Streaming methods must be invoked using HubConnection.Stream"));
// We "delivered" the stream item successfully as far as the caller cares
return new ValueTask<bool>(true);
}
protected override void Cancel()
{
_completionSource.TrySetCanceled();
}
}
}
}

View File

@ -34,5 +34,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
public static CompletionMessage WithError(string invocationId, string error) => new CompletionMessage(invocationId, error, result: null, hasResult: false);
public static CompletionMessage WithResult(string invocationId, object payload) => new CompletionMessage(invocationId, error: null, result: payload, hasResult: true);
public static CompletionMessage Empty(string invocationId) => new CompletionMessage(invocationId, error: null, result: null, hasResult: false);
}
}

View File

@ -13,6 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
public class JsonHubProtocol : IHubProtocol
{
private const string ResultPropertyName = "result";
private const string ItemPropertyName = "item";
private const string InvocationIdPropertyName = "invocationId";
private const string TypePropertyName = "type";
private const string ErrorPropertyName = "error";
@ -117,7 +118,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
WriteInvocationMessage(m, writer);
break;
case StreamItemMessage m:
WriteResultMessage(m, writer);
WriteStreamItemMessage(m, writer);
break;
case CompletionMessage m:
WriteCompletionMessage(m, writer);
@ -145,11 +146,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
writer.WriteEndObject();
}
private void WriteResultMessage(StreamItemMessage message, JsonTextWriter writer)
private void WriteStreamItemMessage(StreamItemMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, ResultMessageType);
writer.WritePropertyName(ResultPropertyName);
writer.WritePropertyName(ItemPropertyName);
_payloadSerializer.Serialize(writer, message.Item);
writer.WriteEndObject();
}
@ -216,7 +217,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private StreamItemMessage BindResultMessage(JObject json, IInvocationBinder binder)
{
var invocationId = GetRequiredProperty<string>(json, InvocationIdPropertyName, JTokenType.String);
var result = GetRequiredProperty<JToken>(json, ResultPropertyName);
var result = GetRequiredProperty<JToken>(json, ItemPropertyName);
var returnType = binder.GetReturnType(invocationId);
return new StreamItemMessage(invocationId, result?.ToObject(returnType, _payloadSerializer));

View File

@ -3,10 +3,12 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.Sockets;
@ -220,8 +222,7 @@ namespace Microsoft.AspNetCore.SignalR
}
else
{
var result = await Invoke(descriptor, connection, invocationMessage);
await SendMessageAsync(connection, protocol, result);
await Invoke(descriptor, connection, protocol, invocationMessage);
}
}
@ -243,7 +244,7 @@ namespace Microsoft.AspNetCore.SignalR
throw new OperationCanceledException("Outbound channel was closed while trying to write hub message");
}
private async Task<CompletionMessage> Invoke(HubMethodDescriptor descriptor, ConnectionContext connection, InvocationMessage invocationMessage)
private async Task Invoke(HubMethodDescriptor descriptor, ConnectionContext connection, IHubProtocol protocol, InvocationMessage invocationMessage)
{
var methodExecutor = descriptor.MethodExecutor;
@ -257,7 +258,9 @@ namespace Microsoft.AspNetCore.SignalR
InitializeHub(hub, connection);
object result = null;
if (methodExecutor.IsMethodAsync)
// ReadableChannel is awaitable but we don't want to await it.
if (methodExecutor.IsMethodAsync && !IsChannel(methodExecutor.MethodReturnType, out _))
{
if (methodExecutor.MethodReturnType == typeof(Task))
{
@ -273,17 +276,26 @@ namespace Microsoft.AspNetCore.SignalR
result = methodExecutor.Execute(hub, invocationMessage.Arguments);
}
return CompletionMessage.WithResult(invocationMessage.InvocationId, result);
if (IsStreamed(methodExecutor, result, methodExecutor.MethodReturnType, out var enumerator))
{
_logger.LogTrace("[{connectionId}/{invocationId}] Streaming result of type {resultType}", connection.ConnectionId, invocationMessage.InvocationId, methodExecutor.MethodReturnType.FullName);
await StreamResultsAsync(invocationMessage.InvocationId, connection, protocol, enumerator);
}
else
{
_logger.LogTrace("[{connectionId}/{invocationId}] Sending result of type {resultType}", connection.ConnectionId, invocationMessage.InvocationId, methodExecutor.MethodReturnType.FullName);
await SendMessageAsync(connection, protocol, CompletionMessage.WithResult(invocationMessage.InvocationId, result));
}
}
catch (TargetInvocationException ex)
{
_logger.LogError(0, ex, "Failed to invoke hub method");
return CompletionMessage.WithError(invocationMessage.InvocationId, ex.InnerException.Message);
await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationMessage.InvocationId, ex.InnerException.Message));
}
catch (Exception ex)
{
_logger.LogError(0, ex, "Failed to invoke hub method");
return CompletionMessage.WithError(invocationMessage.InvocationId, ex.Message);
await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationMessage.InvocationId, ex.Message));
}
finally
{
@ -299,6 +311,74 @@ namespace Microsoft.AspNetCore.SignalR
hub.Groups = new GroupManager<THub>(connection, _lifetimeManager);
}
private bool IsChannel(Type type, out Type payloadType)
{
var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>));
if (channelType == null)
{
payloadType = null;
return false;
}
else
{
payloadType = channelType.GetGenericArguments()[0];
return true;
}
}
private async Task StreamResultsAsync(string invocationId, ConnectionContext connection, IHubProtocol protocol, IAsyncEnumerator<object> enumerator)
{
// TODO: Cancellation? See https://github.com/aspnet/SignalR/issues/481
try
{
while (await enumerator.MoveNextAsync())
{
// Send the stream item
await SendMessageAsync(connection, protocol, new StreamItemMessage(invocationId, enumerator.Current));
}
await SendMessageAsync(connection, protocol, CompletionMessage.Empty(invocationId));
}
catch (Exception ex)
{
await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationId, ex.Message));
}
}
private bool IsStreamed(ObjectMethodExecutor methodExecutor, object result, Type resultType, out IAsyncEnumerator<object> enumerator)
{
if (result == null)
{
enumerator = null;
return false;
}
var observableInterface = IsIObservable(resultType) ?
resultType :
resultType.GetInterfaces().FirstOrDefault(IsIObservable);
if (observableInterface != null)
{
enumerator = AsyncEnumeratorAdapters.FromObservable(result, observableInterface);
return true;
}
else if (IsChannel(resultType, out var payloadType))
{
enumerator = AsyncEnumeratorAdapters.FromChannel(result, payloadType);
return true;
}
else
{
// Not streamed
enumerator = null;
return false;
}
}
private static bool IsIObservable(Type iface)
{
return iface.IsGenericType && iface.GetGenericTypeDefinition() == typeof(IObservable<>);
}
private void DiscoverHubMethods()
{
var hubType = typeof(THub);

View File

@ -0,0 +1,124 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
namespace Microsoft.AspNetCore.SignalR.Internal
{
// True-internal because this is a weird and tricky class to use :)
internal static class AsyncEnumeratorAdapters
{
private static readonly MethodInfo _boxEnumeratorMethod = typeof(AsyncEnumeratorAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(BoxEnumerator)) && m.IsGenericMethod);
private static readonly MethodInfo _fromObservableMethod = typeof(AsyncEnumeratorAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(FromObservable)) && m.IsGenericMethod);
private static readonly object[] _getAsyncEnumeratorArgs = new object[] { CancellationToken.None };
public static IAsyncEnumerator<object> FromObservable(object observable, Type observableInterface)
{
// TODO: Cache expressions by observable.GetType()?
return (IAsyncEnumerator<object>)_fromObservableMethod
.MakeGenericMethod(observableInterface.GetGenericArguments())
.Invoke(null, new[] { observable });
}
public static IAsyncEnumerator<object> FromObservable<T>(IObservable<T> observable)
{
// TODO: Allow bounding and optimizations?
var channel = Channel.CreateUnbounded<object>();
var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Out, CancellationToken.None));
return channel.In.GetAsyncEnumerator();
}
public static IAsyncEnumerator<object> FromChannel(object readableChannelOfT, Type payloadType)
{
var enumerator = readableChannelOfT
.GetType()
.GetRuntimeMethod("GetAsyncEnumerator", new[] { typeof(CancellationToken) })
.Invoke(readableChannelOfT, _getAsyncEnumeratorArgs);
if (payloadType.IsValueType)
{
return (IAsyncEnumerator<object>)_boxEnumeratorMethod
.MakeGenericMethod(payloadType)
.Invoke(null, new[] { enumerator });
}
else
{
return (IAsyncEnumerator<object>)enumerator;
}
}
private static IAsyncEnumerator<object> BoxEnumerator<T>(IAsyncEnumerator<T> input) where T : struct
{
return new BoxingEnumerator<T>(input);
}
private class ChannelObserver<T> : IObserver<T>
{
private WritableChannel<object> _output;
private CancellationToken _cancellationToken;
public ChannelObserver(WritableChannel<object> output, CancellationToken cancellationToken)
{
_output = output;
_cancellationToken = cancellationToken;
}
public void OnCompleted()
{
_output.TryComplete();
}
public void OnError(Exception error)
{
_output.TryComplete(error);
}
public void OnNext(T value)
{
_cancellationToken.ThrowIfCancellationRequested();
// This will block the thread emitting the object if the channel is bounded and full
// I think this is OK, since we want to push the backpressure up. However, we may need
// to find a way to force the entire subscription off to a dedicated thread in order to
// ensure we don't block other tasks
// Right now however, we use unbounded channels, so all of the above is moot because TryWrite will always succeed
while (!_output.TryWrite(value))
{
// Wait for a spot
if (!_output.WaitToWriteAsync(_cancellationToken).Result)
{
// Channel was closed.
throw new InvalidOperationException("Output channel was closed");
}
}
}
}
private class BoxingEnumerator<T> : IAsyncEnumerator<object> where T : struct
{
private IAsyncEnumerator<T> _input;
public BoxingEnumerator(IAsyncEnumerator<T> input)
{
_input = input;
}
public object Current => _input.Current;
public Task<bool> MoveNextAsync() => _input.MoveNextAsync();
}
}
}

View File

@ -0,0 +1,21 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
namespace Microsoft.AspNetCore.SignalR.Internal
{
public static class TypeBaseEnumerationExtensions
{
public static IEnumerable<Type> AllBaseTypes(this Type type)
{
var current = type;
while (current != null)
{
yield return current;
current = current.BaseType;
}
}
}
}

View File

@ -0,0 +1,24 @@
using System.Collections.Generic;
namespace System.Threading.Tasks.Channels
{
internal static class ChannelExtensions
{
public static async Task<List<T>> ReadAllAsync<T>(this ReadableChannel<T> channel)
{
var list = new List<T>();
while (await channel.WaitToReadAsync())
{
while (channel.TryRead(out var item))
{
list.Add(item);
}
}
// Manifest any error from channel.Completion (which should be completed now)
await channel.Completion;
return list;
}
}
}

View File

@ -1,8 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.SignalR.Tests.Common;
@ -50,12 +52,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var connection = new HubConnection(new Uri("http://test/hubs"));
var connection = new HubConnection(new Uri("http://test/hubs"), loggerFactory);
try
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("HelloWorld");
var result = await connection.Invoke<string>(nameof(TestHub.HelloWorld));
Assert.Equal("Hello World!", result);
}
@ -74,12 +76,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var connection = new HubConnection(new Uri("http://test/hubs"));
var connection = new HubConnection(new Uri("http://test/hubs"), loggerFactory);
try
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("Echo", originalMessage);
var result = await connection.Invoke<string>(nameof(TestHub.Echo), originalMessage);
Assert.Equal(originalMessage, result);
}
@ -103,7 +105,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
var result = await connection.Invoke<string>("echo", originalMessage);
var result = await connection.Invoke<string>(nameof(TestHub.Echo).ToLowerInvariant(), originalMessage);
Assert.Equal(originalMessage, result);
}
@ -122,7 +124,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var connection = new HubConnection(new Uri("http://test/hubs"));
var connection = new HubConnection(new Uri("http://test/hubs"), loggerFactory);
try
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
@ -130,7 +132,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var tcs = new TaskCompletionSource<string>();
connection.On<string>("Echo", tcs.SetResult);
await connection.Invoke("CallEcho", originalMessage).OrTimeout();
await connection.Invoke(nameof(TestHub.CallEcho), originalMessage).OrTimeout();
Assert.Equal(originalMessage, await tcs.Task.OrTimeout());
}
@ -141,6 +143,31 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
}
}
[Fact]
public async Task CanStreamClientMethodFromServer()
{
var loggerFactory = CreateLogger();
using (var httpClient = _testServer.CreateClient())
{
var connection = new HubConnection(new Uri("http://test/hubs"), loggerFactory);
try
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
var tcs = new TaskCompletionSource<string>();
var results = await connection.Stream<string>(nameof(TestHub.Stream)).ReadAllAsync().OrTimeout();
Assert.Equal(new[] { "a", "b", "c" }, results.ToArray());
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}
[Fact]
public async Task ServerClosesConnectionIfHubMethodCannotBeResolved()
{
@ -148,7 +175,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var connection = new HubConnection(new Uri("http://test/hubs"));
var connection = new HubConnection(new Uri("http://test/hubs"), loggerFactory);
try
{
await connection.StartAsync(TransportType.LongPolling, httpClient);
@ -196,6 +223,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
await Clients.Client(Context.ConnectionId).InvokeAsync("Echo", message);
}
public IObservable<string> Stream()
{
return new[] { "a", "b", "c" }.ToObservable();
}
}
}
}

View File

@ -9,6 +9,7 @@
<ItemGroup>
<Compile Include="..\Common\TaskExtensions.cs" Link="TaskExtensions.cs" />
<Compile Include="..\Common\ChannelExtensions.cs" Link="ChannelExtensions.cs" />
</ItemGroup>
<ItemGroup>
@ -25,6 +26,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="System.Reactive.Linq" Version="$(RxVersion)" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -128,7 +128,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
var closeTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closeTcs.TrySetException(e);
hubConnection.Closed += e =>
{
if (e == null)
{
closeTcs.TrySetResult(null);
}
else
{
closeTcs.TrySetException(e);
}
};
try
{
hubConnection.On<int>("Foo", r => { });
@ -137,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.ReceiveJsonMessage(
new
{
invocationId = "1",
invocationId = "1",
type = 1,
target = "Foo",
arguments = new object[] { 42, "42" }
@ -159,7 +170,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
var closeTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closeTcs.TrySetException(e);
hubConnection.Closed += e =>
{
if (e == null)
{
closeTcs.TrySetResult(null);
}
else
{
closeTcs.TrySetException(e);
}
};
try
{
hubConnection.On<int>("Foo", r => { });

View File

@ -3,6 +3,7 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.Extensions.Logging;
@ -38,6 +39,32 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task StreamSendsAnInvocationMessage()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
try
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<object>("Foo");
var invokeMessage = await connection.ReadSentTextMessageAsync().OrTimeout();
Assert.Equal("{\"invocationId\":\"1\",\"type\":1,\"target\":\"Foo\",\"arguments\":[]}", invokeMessage);
// Complete the channel
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
await channel.Completion;
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task InvokeCompletedWhenCompletionMessageReceived()
{
@ -60,6 +87,28 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task StreamCompletesWhenCompletionMessageIsReceived()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
try
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
Assert.Empty(await channel.ReadAllAsync());
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task InvokeYieldsResultWhenCompletionMessageReceived()
{
@ -82,6 +131,29 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task StreamFailsIfCompletionMessageHasPayload()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
try
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout();
var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await channel.ReadAllAsync().OrTimeout());
Assert.Equal("Server provided a result in a completion response to a streamed invocation.", ex.Message);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task InvokeFailsWithExceptionWhenCompletionWithErrorReceived()
{
@ -106,7 +178,29 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
[Fact]
// This will fail (intentionally) when we support streaming!
public async Task StreamFailsWithExceptionWhenCompletionWithErrorReceived()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
try
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout();
var ex = await Assert.ThrowsAsync<HubException>(async () => await channel.ReadAllAsync().OrTimeout());
Assert.Equal("An error occurred", ex.Message);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task InvokeFailsWithErrorWhenStreamingItemReceived()
{
var connection = new TestConnection();
@ -117,10 +211,37 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var invokeTask = hubConnection.Invoke<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, result = 42 }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 42 }).OrTimeout();
var ex = await Assert.ThrowsAsync<NotSupportedException>(() => invokeTask).OrTimeout();
Assert.Equal("Streaming method results are not supported", ex.Message);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => invokeTask).OrTimeout();
Assert.Equal("Streaming methods must be invoked using HubConnection.Stream", ex.Message);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task StreamYieldsItemsAsTheyArrive()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(new JsonSerializer()), new LoggerFactory());
try
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "3" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
var notifications = await channel.ReadAllAsync().OrTimeout();
Assert.Equal(new[] { "1", "2", "3", }, notifications.ToArray());
}
finally
{

View File

@ -10,6 +10,7 @@
<ItemGroup>
<Compile Include="..\Common\TaskExtensions.cs" Link="TaskExtensions.cs" />
<Compile Include="..\Common\ArrayOutput.cs" Link="ArrayOutput.cs" />
<Compile Include="..\Common\ChannelExtensions.cs" Link="ChannelExtensions.cs" />
</ItemGroup>
<ItemGroup>

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -116,4 +116,4 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
}
}
}

View File

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -24,15 +24,15 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
new object[] { new InvocationMessage("123", false, "Target", new CustomObject()), false, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":1,\"target\":\"Target\",\"arguments\":[{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\",\"NullProp\":null}]}" },
new object[] { new InvocationMessage("123", false, "Target", new CustomObject()), true, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":1,\"target\":\"Target\",\"arguments\":[{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\",\"nullProp\":null}]}" },
new object[] { new StreamItemMessage("123", 1), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":1}" },
new object[] { new StreamItemMessage("123", "Foo"), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":\"Foo\"}" },
new object[] { new StreamItemMessage("123", 2.0f), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":2.0}" },
new object[] { new StreamItemMessage("123", true), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":true}" },
new object[] { new StreamItemMessage("123", null), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":null}" },
new object[] { new StreamItemMessage("123", new CustomObject()), false, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\"}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"result\":{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\"}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), false, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":2,\"result\":{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\",\"NullProp\":null}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), true, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":2,\"result\":{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\",\"nullProp\":null}}" },
new object[] { new StreamItemMessage("123", 1), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":1}" },
new object[] { new StreamItemMessage("123", "Foo"), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":\"Foo\"}" },
new object[] { new StreamItemMessage("123", 2.0f), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":2.0}" },
new object[] { new StreamItemMessage("123", true), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":true}" },
new object[] { new StreamItemMessage("123", null), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":null}" },
new object[] { new StreamItemMessage("123", new CustomObject()), false, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\"}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":2,\"item\":{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\"}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), false, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":2,\"item\":{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\",\"NullProp\":null}}" },
new object[] { new StreamItemMessage("123", new CustomObject()), true, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":2,\"item\":{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\",\"nullProp\":null}}" },
new object[] { CompletionMessage.WithResult("123", 1), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":3,\"result\":1}" },
new object[] { CompletionMessage.WithResult("123", "Foo"), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":3,\"result\":\"Foo\"}" },
@ -96,7 +96,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
[InlineData("{'type':2}", "Missing required property 'invocationId'.")]
[InlineData("{'type':2,'invocationId':42}", "Expected 'invocationId' to be of type String.")]
[InlineData("{'type':2,'invocationId':'42'}", "Missing required property 'result'.")]
[InlineData("{'type':2,'invocationId':'42'}", "Missing required property 'item'.")]
[InlineData("{'type':3}", "Missing required property 'invocationId'.")]
[InlineData("{'type':3,'invocationId':42}", "Expected 'invocationId' to be of type String.")]

View File

@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
namespace Microsoft.AspNetCore.SignalR.Tests
{
internal class CancellationDisposable : IDisposable
{
private CancellationTokenSource _cts;
public CancellationDisposable(CancellationTokenSource cts)
{
_cts = cts;
}
public void Dispose()
{
_cts.Cancel();
_cts.Dispose();
}
}
}

View File

@ -2,7 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
@ -21,7 +23,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var serviceProvider = CreateServiceProvider(s => s.AddSingleton(trackDispose));
var endPoint = serviceProvider.GetService<HubEndPoint<DisposeTrackingHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -51,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<Hub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
@ -79,7 +81,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<OnConnectedThrowsHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
client.Dispose();
@ -103,7 +105,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<OnDisconnectedThrowsHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
client.Dispose();
@ -123,7 +125,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -146,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -170,7 +172,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -192,7 +194,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -215,7 +217,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -237,7 +239,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -259,7 +261,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<InheritedHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -281,7 +283,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<InheritedHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -303,7 +305,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -341,8 +343,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var firstClient = new TestClient(serviceProvider))
using (var secondClient = new TestClient(serviceProvider))
using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
{
var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
@ -376,8 +378,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var firstClient = new TestClient(serviceProvider))
using (var secondClient = new TestClient(serviceProvider))
using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
{
var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
@ -418,7 +420,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(serviceProvider))
using (var client = new TestClient())
{
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -438,8 +440,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var firstClient = new TestClient(serviceProvider))
using (var secondClient = new TestClient(serviceProvider))
using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
{
var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
@ -470,8 +472,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var firstClient = new TestClient(serviceProvider))
using (var secondClient = new TestClient(serviceProvider))
using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
{
var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
@ -495,6 +497,62 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Theory]
[InlineData(nameof(StreamingHub.CounterChannel))]
[InlineData(nameof(StreamingHub.CounterObservable))]
public async Task HubsCanStreamResponses(string method)
{
var serviceProvider = CreateServiceProvider();
var endPoint = serviceProvider.GetService<HubEndPoint<StreamingHub>>();
using (var client = new TestClient())
{
var endPointLifetime = endPoint.OnConnectedAsync(client.Connection);
await client.Connected.OrTimeout();
var messages = await client.StreamAsync(method, 4).OrTimeout();
Assert.Equal(5, messages.Count);
AssertHubMessage(new StreamItemMessage(string.Empty, "0"), messages[0]);
AssertHubMessage(new StreamItemMessage(string.Empty, "1"), messages[1]);
AssertHubMessage(new StreamItemMessage(string.Empty, "2"), messages[2]);
AssertHubMessage(new StreamItemMessage(string.Empty, "3"), messages[3]);
AssertHubMessage(new CompletionMessage(string.Empty, error: null, result: null, hasResult: false), messages[4]);
client.Dispose();
await endPointLifetime;
}
}
private static void AssertHubMessage(HubMessage expected, HubMessage actual)
{
// We aren't testing InvocationIds here
switch (expected)
{
case CompletionMessage expectedCompletion:
var actualCompletion = Assert.IsType<CompletionMessage>(actual);
Assert.Equal(expectedCompletion.Error, actualCompletion.Error);
Assert.Equal(expectedCompletion.HasResult, actualCompletion.HasResult);
Assert.Equal(expectedCompletion.Result, actualCompletion.Result);
break;
case StreamItemMessage expectedStreamItem:
var actualStreamItem = Assert.IsType<StreamItemMessage>(actual);
Assert.Equal(expectedStreamItem.Item, actualStreamItem.Item);
break;
case InvocationMessage expectedInvocation:
var actualInvocation = Assert.IsType<InvocationMessage>(actual);
Assert.Equal(expectedInvocation.NonBlocking, actualInvocation.NonBlocking);
Assert.Equal(expectedInvocation.Target, actualInvocation.Target);
Assert.Equal(expectedInvocation.Arguments, actualInvocation.Arguments);
break;
default:
throw new InvalidOperationException($"Unsupported Hub Message type {expected.GetType()}");
}
}
private static Type GetEndPointType(Type hubType)
{
var endPointType = typeof(HubEndPoint<>);
@ -518,6 +576,55 @@ namespace Microsoft.AspNetCore.SignalR.Tests
return services.BuildServiceProvider();
}
public class StreamingHub : TestHub
{
public IObservable<string> CounterObservable(int count)
{
return new CountingObservable(count);
}
public ReadableChannel<string> CounterChannel(int count)
{
var channel = Channel.CreateUnbounded<string>();
var task = Task.Run(async () =>
{
for (int i = 0; i < count; i++)
{
await channel.Out.WriteAsync(i.ToString());
}
channel.Out.Complete();
});
return channel.In;
}
private class CountingObservable : IObservable<string>
{
private int _count;
public CountingObservable(int count)
{
_count = count;
}
public IDisposable Subscribe(IObserver<string> observer)
{
var cts = new CancellationTokenSource();
Task.Run(() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested && i < _count; i++)
{
observer.OnNext(i.ToString());
}
observer.OnCompleted();
});
return new CancellationDisposable(cts);
}
}
}
public class OnConnectedThrowsHub : Hub
{
public override Task OnConnectedAsync()
@ -528,7 +635,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public class OnDisconnectedThrowsHub : Hub
public class OnDisconnectedThrowsHub : TestHub
{
public override Task OnDisconnectedAsync(Exception exception)
{
@ -538,14 +645,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
private class MethodHub : Hub
private class MethodHub : TestHub
{
public override Task OnConnectedAsync()
{
Context.Connection.Metadata.Get<TaskCompletionSource<bool>>("ConnectedTask").SetResult(true);
return base.OnConnectedAsync();
}
public Task GroupRemoveMethod(string groupName)
{
return Groups.RemoveAsync(groupName);
@ -624,7 +725,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
private class BaseHub : Hub
private class BaseHub : TestHub
{
public string BaseMethod(string message)
{
@ -637,7 +738,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
private class InvalidHub : Hub
private class InvalidHub : TestHub
{
public void OverloadedMethod(int num)
{
@ -648,7 +749,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
private class DisposeTrackingHub : Hub
private class DisposeTrackingHub : TestHub
{
private TrackDispose _trackDispose;
@ -670,5 +771,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
public int DisposeCount = 0;
}
public abstract class TestHub : Hub
{
public override Task OnConnectedAsync()
{
Context.Connection.Metadata.Get<TaskCompletionSource<bool>>("ConnectedTask")?.TrySetResult(true);
return base.OnConnectedAsync();
}
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -21,11 +21,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
private IHubProtocol _protocol;
private CancellationTokenSource _cts;
public ConnectionContext Connection;
public ConnectionContext Connection { get; }
public IChannelConnection<Message> Application { get; }
public Task Connected => Connection.Metadata.Get<TaskCompletionSource<bool>>("ConnectedTask").Task;
public TestClient(IServiceProvider serviceProvider)
public TestClient()
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -42,6 +42,39 @@ namespace Microsoft.AspNetCore.SignalR.Tests
_cts = new CancellationTokenSource();
}
public async Task<IList<HubMessage>> StreamAsync(string methodName, params object[] args)
{
var invocationId = await SendInvocationAsync(methodName, args);
var messages = new List<HubMessage>();
while (true)
{
var message = await Read();
if (!string.Equals(message.InvocationId, invocationId))
{
throw new NotSupportedException("TestClient does not support multiple outgoing invocations!");
}
if (message == null)
{
throw new InvalidOperationException("Connection aborted!");
}
switch (message)
{
case StreamItemMessage _:
messages.Add(message);
break;
case CompletionMessage _:
messages.Add(message);
return messages;
default:
throw new NotSupportedException("TestClient does not support receiving invocations!");
}
}
}
public async Task<CompletionMessage> InvokeAsync(string methodName, params object[] args)
{
var invocationId = await SendInvocationAsync(methodName, args);
@ -63,7 +96,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
switch (message)
{
case StreamItemMessage result:
throw new NotSupportedException("TestClient does not support streaming!");
throw new NotSupportedException("Use 'StreamAsync' to call a streaming method");
case CompletionMessage completion:
return completion;
default: