From 34bb352189c653907796194836f72c36e32ef701 Mon Sep 17 00:00:00 2001 From: DylanDmitri Date: Mon, 4 Jun 2018 10:33:31 -0700 Subject: [PATCH] streaming from client to server --- clients/ts/signalr/src/UploadStream.ts | 27 ++++ .../wwwroot/channelParameters.html | 128 ++++++++++++++++++ .../clients/ts/signalr/src/HubConnection.ts | 48 +++++-- .../clients/ts/signalr/src/IHubProtocol.ts | 42 ++++-- .../ts/signalr/tests/HubConnection.test.ts | 33 +++++ .../samples/ClientSample/UploadSample.cs | 116 ++++++++++++---- .../samples/SignalRSamples/Hubs/UploadHub.cs | 90 ++++++------ .../samples/SignalRSamples/wwwroot/index.html | 8 +- src/SignalR/src/Common/ReflectionHelper.cs | 2 +- .../HubConnection.cs | 7 +- .../Internal/DefaultHubDispatcher.cs | 4 - .../HubConnectionTests.cs | 8 +- .../Hubs.cs | 17 +++ .../HubConnectionHandlerTestUtils/Hubs.cs | 20 +++ .../HubConnectionHandlerTests.cs | 43 +++++- 15 files changed, 484 insertions(+), 109 deletions(-) create mode 100644 clients/ts/signalr/src/UploadStream.ts create mode 100644 samples/SignalRSamples/wwwroot/channelParameters.html diff --git a/clients/ts/signalr/src/UploadStream.ts b/clients/ts/signalr/src/UploadStream.ts new file mode 100644 index 0000000000..eb1deafc36 --- /dev/null +++ b/clients/ts/signalr/src/UploadStream.ts @@ -0,0 +1,27 @@ +import { HubConnection } from "./HubConnection"; +import { MessageType } from "./IHubProtocol"; + +export class UploadStream { + private connection: HubConnection; + + public readonly streamId: string; + public readonly placeholder: object; + + constructor(connection: HubConnection) { + this.connection = connection; + this.streamId = connection.nextStreamId(); + this.placeholder = {streamId: this.streamId}; + } + + public write(item: any): Promise { + return this.connection.sendWithProtocol(this.connection.createStreamDataMessage(this.streamId, item)); + } + + public complete(error?: string): Promise { + if (error) { + return this.connection.sendWithProtocol({ type: MessageType.StreamComplete, streamId: this.streamId, error }); + } else { + return this.connection.sendWithProtocol({ type: MessageType.StreamComplete, streamId: this.streamId }); + } + } +} diff --git a/samples/SignalRSamples/wwwroot/channelParameters.html b/samples/SignalRSamples/wwwroot/channelParameters.html new file mode 100644 index 0000000000..464186a670 --- /dev/null +++ b/samples/SignalRSamples/wwwroot/channelParameters.html @@ -0,0 +1,128 @@ + + + + + + + + + +

Streaming Parameters

+

Unknown Transport

+ +

Controls

+
+ + + +
+ +
+ + +
+ +

Results

+
    + +
      + + + + + + diff --git a/src/SignalR/clients/ts/signalr/src/HubConnection.ts b/src/SignalR/clients/ts/signalr/src/HubConnection.ts index 2ec156a7da..e0b73b5fe5 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnection.ts @@ -1,9 +1,11 @@ +import { UploadStream } from "./UploadStream"; + // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } from "./HandshakeProtocol"; import { IConnection } from "./IConnection"; -import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol"; +import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamDataMessage, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol"; import { ILogger, LogLevel } from "./ILogger"; import { IStreamResult } from "./Stream"; import { Arg, Subject } from "./Utils"; @@ -29,6 +31,7 @@ export class HubConnection { private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void }; private methods: { [name: string]: Array<(...args: any[]) => void> }; private id: number; + private streamId: number; private closedCallbacks: Array<(error?: Error) => void>; private receivedHandshakeResponse: boolean; private handshakeResolver!: (value?: PromiseLike<{}>) => void; @@ -84,6 +87,7 @@ export class HubConnection { this.methods = {}; this.closedCallbacks = []; this.id = 0; + this.streamId = 0; this.receivedHandshakeResponse = false; this.connectionState = HubConnectionState.Disconnected; @@ -122,7 +126,7 @@ export class HubConnection { this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`); - // defensively cleanup timeout in case we receive a message from the server before we finish start + // defensively cleanup timeout in case we receive a message export from the server before we finish start this.cleanupTimeout(); this.resetTimeoutPeriod(); this.resetKeepAliveInterval(); @@ -156,11 +160,10 @@ export class HubConnection { const subject = new Subject(() => { const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId); - const cancelMessage: any = this.protocol.writeMessage(cancelInvocation); delete this.callbacks[invocationDescriptor.invocationId]; - return this.sendMessage(cancelMessage); + return this.sendWithProtocol(cancelInvocation); }); this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => { @@ -181,9 +184,7 @@ export class HubConnection { } }; - const message = this.protocol.writeMessage(invocationDescriptor); - - this.sendMessage(message) + this.sendWithProtocol(invocationDescriptor) .catch((e) => { subject.error(e); delete this.callbacks[invocationDescriptor.invocationId]; @@ -197,6 +198,14 @@ export class HubConnection { return this.connection.send(message); } + /** + * Sends a js object to the server. + * @param message The js object to serialize and send. + */ + public sendWithProtocol(message: any) { + return this.sendMessage(this.protocol.writeMessage(message)); + } + /** Invokes a hub method on the server using the specified name and arguments. Does not wait for a response from the receiver. * * The Promise returned by this method resolves when the client has sent the invocation to the server. The server may still @@ -207,11 +216,16 @@ export class HubConnection { * @returns {Promise} A Promise that resolves when the invocation has been successfully sent, or rejects with an error. */ public send(methodName: string, ...args: any[]): Promise { - const invocationDescriptor = this.createInvocation(methodName, args, true); + return this.sendWithProtocol(this.createInvocation(methodName, args, true)); + } - const message = this.protocol.writeMessage(invocationDescriptor); + public nextStreamId(): string { + this.streamId += 1; + return this.streamId.toString(); + } - return this.sendMessage(message); + public newUploadStream(): UploadStream { + return new UploadStream(this); } /** Invokes a hub method on the server using the specified name and arguments. @@ -229,7 +243,7 @@ export class HubConnection { const invocationDescriptor = this.createInvocation(methodName, args, false); const p = new Promise((resolve, reject) => { - // invocationId will always have a value for a non-blocking invocation + // invocationId will always have a value for a non-blocking inexport vocation this.callbacks[invocationDescriptor.invocationId!] = (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => { if (error) { reject(error); @@ -248,9 +262,7 @@ export class HubConnection { } }; - const message = this.protocol.writeMessage(invocationDescriptor); - - this.sendMessage(message) + this.sendWithProtocol(invocationDescriptor) .catch((e) => { reject(e); // invocationId will always have a value for a non-blocking invocation @@ -538,4 +550,12 @@ export class HubConnection { type: MessageType.CancelInvocation, }; } + + public createStreamDataMessage(id: string, item: any): StreamDataMessage { + return { + item, + streamId: id, + type: MessageType.StreamData, + }; + } } diff --git a/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts b/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts index e771e399e1..d7c45eec19 100644 --- a/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts +++ b/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts @@ -20,6 +20,10 @@ export enum MessageType { Ping = 6, /** Indicates the message is a Close message and implements the {@link @aspnet/signalr.CloseMessage} interface. */ Close = 7, + /** Indicates the message is a StreamComplete message and implements the {@link StreamCompleteMessage} interface */ + StreamComplete = 8, + /** Indicates the message is a ParamterStreaming message and implements the {@link StreamDataMessage} interface */ + StreamData = 9, } /** Defines a dictionary of string keys and string values representing headers attached to a Hub message. */ @@ -29,14 +33,14 @@ export interface MessageHeaders { } /** Union type of all known Hub messages. */ -export type HubMessage = - InvocationMessage | - StreamInvocationMessage | - StreamItemMessage | - CompletionMessage | - CancelInvocationMessage | - PingMessage | - CloseMessage; +export type HubMessage = InvocationMessage + | StreamInvocationMessage + | StreamItemMessage + | CompletionMessage + | CancelInvocationMessage + | PingMessage + | CloseMessage + | StreamDataMessage; /** Defines properties common to all Hub messages. */ export interface HubMessageBase { @@ -91,6 +95,18 @@ export interface StreamItemMessage extends HubInvocationMessage { readonly item?: any; } +/** A hub message representing a single stream item, transferred through a streaming parameter. */ +export interface StreamDataMessage extends HubMessageBase { + /** @inheritDoc */ + readonly type: MessageType.StreamData; + + /** The streamId */ + readonly streamId: string; + + /** The item produced by the client */ + readonly item?: any; +} + /** A hub message representing the result of an invocation. */ export interface CompletionMessage extends HubInvocationMessage { /** @inheritDoc */ @@ -137,6 +153,16 @@ export interface CancelInvocationMessage extends HubInvocationMessage { readonly invocationId: string; } +/** A hub message send to indicate the end of stream items for a streaming parameter. */ +export interface StreamCompleteMessage extends HubMessageBase { + /** @inheritDoc */ + readonly type: MessageType.StreamComplete; + /** The stream ID of the stream to be completed. */ + readonly streamId: string; + /** The error that trigger completion, if any. */ + readonly error?: string; +} + /** A protocol abstraction for communicating with SignalR Hubs. */ export interface IHubProtocol { /** The name of the protocol. This is used by SignalR to resolve the protocol between the client and server. */ diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts index 04b889b59d..85cd6e107d 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts @@ -330,6 +330,39 @@ describe("HubConnection", () => { }); }); + it("is able to send stream items to server", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + const hubConnection = createHubConnection(connection, logger); + try { + connection.receiveHandshakeResponse(); + + const stream = hubConnection.newUploadStream(); + const invokePromise = hubConnection.invoke("testMethod", "arg", stream.placeholder); + + expect(JSON.parse(connection.sentData[0])).toEqual({ + arguments: ["arg", {streamId: "1"}], + invocationId: "0", + target: "testMethod", + type: MessageType.Invocation, + }); + + await stream.write("item numero uno"); + expect(JSON.parse(connection.sentData[1])).toEqual({ + item: "item numero uno", + streamId: "1", + type: MessageType.StreamData, + }); + + connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" }); + + expect(await invokePromise).toBe("foo"); + } finally { + await hubConnection.stop(); + } + }); + }); + it("completes pending invocations when stopped", async () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); diff --git a/src/SignalR/samples/ClientSample/UploadSample.cs b/src/SignalR/samples/ClientSample/UploadSample.cs index b1b173b746..a6ce78f7ca 100644 --- a/src/SignalR/samples/ClientSample/UploadSample.cs +++ b/src/SignalR/samples/ClientSample/UploadSample.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Client; @@ -18,7 +19,7 @@ namespace ClientSample { cmd.Description = "Tests a streaming invocation from client to hub"; - var baseUrlArgument = cmd.Argument("", "The URL to the Chat Hub to test"); + CommandArgument baseUrlArgument = cmd.Argument("", "The URL to the Chat Hub to test"); cmd.OnExecute(() => ExecuteAsync(baseUrlArgument.Value)); }); @@ -31,9 +32,10 @@ namespace ClientSample .Build(); await connection.StartAsync(); - await BasicInvoke(connection); - //await MultiParamInvoke(connection); - //await AdditionalArgs(connection); + //await BasicInvoke(connection); + //await ScoreTrackerExample(connection); + //await FileUploadExample(connection); + await StreamingEcho(connection); return 0; } @@ -46,6 +48,7 @@ namespace ClientSample foreach (var c in "hello") { await channel.Writer.WriteAsync(c.ToString()); + await Task.Delay(1000); } channel.Writer.TryComplete(); @@ -53,37 +56,96 @@ namespace ClientSample Debug.WriteLine($"You message was: {result}"); } - private static async Task WriteStreamAsync(IEnumerable sequence, ChannelWriter writer) + public static async Task ScoreTrackerExample(HubConnection connection) { - foreach (T element in sequence) - { - await writer.WriteAsync(element); - await Task.Delay(100); - } + // Andrew please add the updated code from your laptop here - writer.TryComplete(); - } - - public static async Task MultiParamInvoke(HubConnection connection) - { - var letters = Channel.CreateUnbounded(); - var numbers = Channel.CreateUnbounded(); - - _ = WriteStreamAsync(new[] { "h", "i", "!" }, letters.Writer); - _ = WriteStreamAsync(new[] { 1, 2, 3, 4, 5 }, numbers.Writer); - - var result = await connection.InvokeAsync("DoubleStreamUpload", letters.Reader, numbers.Reader); + var channel_one = Channel.CreateBounded(2); + var channel_two = Channel.CreateBounded(2); + _ = WriteItemsAsync(channel_one.Writer, new[] { 2, 2, 3 }); + _ = WriteItemsAsync(channel_two.Writer, new[] { -2, 5, 3 }); + var result = await connection.InvokeAsync("ScoreTracker", channel_one.Reader, channel_two.Reader); Debug.WriteLine(result); + + + async Task WriteItemsAsync(ChannelWriter source, IEnumerable scores) + { + await Task.Delay(1000); + foreach (var c in scores) + { + await source.WriteAsync(c); + await Task.Delay(250); + } + + // tryComplete triggers the end of this upload's relayLoop + // which sends a StreamComplete to the server + source.TryComplete(); + } } - public static async Task AdditionalArgs(HubConnection connection) + public static async Task FileUploadExample(HubConnection connection) { - var channel = Channel.CreateUnbounded(); - _ = WriteStreamAsync("main message".ToCharArray(), channel.Writer); + var fileNameSource = @"C:\Users\t-dygray\Pictures\weeg.jpg"; + var fileNameDest = @"C:\Users\t-dygray\Pictures\TargetFolder\weeg.jpg"; - var result = await connection.InvokeAsync("UploadWithSuffix", channel.Reader, " + wooh I'm a suffix"); - Debug.WriteLine($"Your message was: {result}"); + var channel = Channel.CreateUnbounded(); + var invocation = connection.InvokeAsync("UploadFile", fileNameDest, channel.Reader); + + using (var file = new FileStream(fileNameSource, FileMode.Open, FileAccess.Read)) + { + foreach (var chunk in GetChunks(file, kilobytesPerChunk: 5)) + { + await channel.Writer.WriteAsync(chunk); + } + } + channel.Writer.TryComplete(); + + Debug.WriteLine(await invocation); + } + + public static IEnumerable GetChunks(FileStream fileStream, double kilobytesPerChunk) + { + var chunkSize = (int)kilobytesPerChunk * 1024; + + var position = 0; + while (true) + { + if (position + chunkSize > fileStream.Length) + { + var lastChunk = new byte[fileStream.Length - position]; + fileStream.Read(lastChunk, 0, lastChunk.Length); + yield return lastChunk; + break; + } + + var chunk = new byte[chunkSize]; + position += fileStream.Read(chunk, 0, chunk.Length); + yield return chunk; + } + } + + public static async Task StreamingEcho(HubConnection connection) + { + var channel = Channel.CreateUnbounded(); + + _ = Task.Run(async () => + { + foreach (var phrase in new[] { "one fish", "two fish", "red fish", "blue fish" }) + { + await channel.Writer.WriteAsync(phrase); + } + }); + + var outputs = await connection.StreamAsChannelAsync("StreamEcho", channel.Reader); + + while (await outputs.WaitToReadAsync()) + { + while (outputs.TryRead(out var item)) + { + Debug.WriteLine($"received '{item}'."); + } + } } } } diff --git a/src/SignalR/samples/SignalRSamples/Hubs/UploadHub.cs b/src/SignalR/samples/SignalRSamples/Hubs/UploadHub.cs index bd821b5066..a0d5a912ea 100644 --- a/src/SignalR/samples/SignalRSamples/Hubs/UploadHub.cs +++ b/src/SignalR/samples/SignalRSamples/Hubs/UploadHub.cs @@ -2,10 +2,12 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; @@ -14,38 +16,10 @@ namespace SignalRSamples.Hubs { public class UploadHub : Hub { - public async Task DoubleStreamUpload(ChannelReader letters, ChannelReader numbers) - { - var total = await Sum(numbers); - var word = await UploadWord(letters); - return string.Format("You sent over <{0}> <{1}s>", total, word); - } - - public async Task Sum(ChannelReader source) + public string Echo(string word) { - var total = 0; - while (await source.WaitToReadAsync()) - { - while (source.TryRead(out var item)) - { - total += item; - } - } - return total; - } - - public async Task LocalSum(ChannelReader source) - { - var total = 0; - while (await source.WaitToReadAsync()) - { - while (source.TryRead(out var item)) - { - total += item; - } - } - Debug.WriteLine(String.Format("Complete, your total is <{0}>.", total)); + return "Echo: " + word; } public async Task UploadWord(ChannelReader source) @@ -55,7 +29,7 @@ namespace SignalRSamples.Hubs // receiving a StreamCompleteMessage should cause this WaitToRead to return false while (await source.WaitToReadAsync()) { - while (source.TryRead(out var item)) + while (source.TryRead(out string item)) { Debug.WriteLine($"received: {item}"); Console.WriteLine($"received: {item}"); @@ -67,29 +41,35 @@ namespace SignalRSamples.Hubs return sb.ToString(); } - public async Task UploadWithSuffix(ChannelReader source, string suffix) + public async Task ScoreTracker(ChannelReader player1, ChannelReader player2) { - var sb = new StringBuilder(); + var p1score = await Loop(player1); + var p2score = await Loop(player2); - while (await source.WaitToReadAsync()) + var winner = p1score > p2score ? "p1" : "p2"; + return $"{winner} wins with a total of {Math.Max(p1score, p2score)} points to {Math.Min(p1score, p2score)}"; + + async Task Loop(ChannelReader reader) { - while (source.TryRead(out var item)) + var score = 0; + + while (await reader.WaitToReadAsync()) { - await Task.Delay(50); - Debug.WriteLine($"received: {item}"); - sb.Append(item); + while (reader.TryRead(out int item)) + { + Debug.WriteLine($"got score {item}"); + score += item; + } } + + return score; } - - sb.Append(suffix); - - return sb.ToString(); } - public async Task UploadFile(ChannelReader source, string filepath) + public async Task UploadFile(string filepath, ChannelReader source) { var result = Enumerable.Empty(); - int chunk = 1; + var chunk = 1; while (await source.WaitToReadAsync()) { @@ -102,9 +82,27 @@ namespace SignalRSamples.Hubs } File.WriteAllBytes(filepath, result.ToArray()); + } - Debug.WriteLine("returning status code"); - return $"file written to '{filepath}'"; + public ChannelReader StreamEcho(ChannelReader source) + { + var output = Channel.CreateUnbounded(); + + _ = Task.Run(async () => + { + while (await source.WaitToReadAsync()) + { + while (source.TryRead(out var item)) + { + Debug.WriteLine($"Echoing '{item}'."); + await output.Writer.WriteAsync("echo:" + item); + } + } + output.Writer.Complete(); + + }); + + return output.Reader; } } } diff --git a/src/SignalR/samples/SignalRSamples/wwwroot/index.html b/src/SignalR/samples/SignalRSamples/wwwroot/index.html index 364424e20d..afffefdd59 100644 --- a/src/SignalR/samples/SignalRSamples/wwwroot/index.html +++ b/src/SignalR/samples/SignalRSamples/wwwroot/index.html @@ -1,4 +1,4 @@ - + @@ -22,5 +22,11 @@
    • Server Sent Events
    • Web Sockets
    • +

      ASP.NET Core SignalR (Parameter Streaming via Channels)

      + +
    • Upload streaming via Long polling
    • +
    • Upload streaming via SSE
    • +
    • Upload streaming via Websockets
    • +
      diff --git a/src/SignalR/src/Common/ReflectionHelper.cs b/src/SignalR/src/Common/ReflectionHelper.cs index eb11f635c1..654371b97a 100644 --- a/src/SignalR/src/Common/ReflectionHelper.cs +++ b/src/SignalR/src/Common/ReflectionHelper.cs @@ -26,7 +26,7 @@ namespace Microsoft.AspNetCore.SignalR // TODO #2594 - add Streams here, to make sending files easy while (type != null) { - if (type.GetGenericTypeDefinition() == typeof(ChannelReader<>)) + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(ChannelReader<>)) { return true; } diff --git a/src/SignalR/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/SignalR/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs index 9ee9fabd63..46ac5c4ca6 100644 --- a/src/SignalR/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs +++ b/src/SignalR/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs @@ -420,6 +420,8 @@ namespace Microsoft.AspNetCore.SignalR.Client irq.Dispose(); } + var readers = PackageStreamingParams(args); + CheckDisposed(); await WaitConnectionLockAsync(); @@ -444,12 +446,14 @@ namespace Microsoft.AspNetCore.SignalR.Client ReleaseConnectionLock(); } + LaunchStreams(readers, cancellationToken); + return channel; } private Dictionary PackageStreamingParams(object[] args) { - // lazy initialized, to avoid allocation unecessary dictionaries + // lazy initialized, to avoid allocating unecessary dictionaries Dictionary readers = null; for (var i = 0; i < args.Length; i++) @@ -621,7 +625,6 @@ namespace Microsoft.AspNetCore.SignalR.Client var readers = PackageStreamingParams(args); Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length); - var invocationMessage = new InvocationMessage(null, methodName, args); await SendWithLock(invocationMessage, callerName: nameof(SendCoreAsync)); diff --git a/src/SignalR/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs b/src/SignalR/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs index 6d78877e09..af77e90e72 100644 --- a/src/SignalR/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs @@ -176,10 +176,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal else { bool isStreamCall = descriptor.HasStreamingParameters; - if (isStreamResponse && isStreamCall) - { - throw new NotSupportedException("Streaming responses for streaming uploads are not supported."); - } return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall); } } diff --git a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs index 656496e1d1..efcc5f1a13 100644 --- a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs +++ b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs @@ -882,8 +882,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests try { await hubConnection.StartAsync().OrTimeout(); - var headerValues = await hubConnection.InvokeAsync(nameof(TestHub.GetHeaderValues), new[] {"X-test", "X-42"}).OrTimeout(); - Assert.Equal(new[] {"42", "test"}, headerValues); + var headerValues = await hubConnection.InvokeAsync(nameof(TestHub.GetHeaderValues), new[] { "X-test", "X-42" }).OrTimeout(); + Assert.Equal(new[] { "42", "test" }, headerValues); } catch (Exception ex) { @@ -946,8 +946,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests await hubConnection.StartAsync().OrTimeout(); var features = await hubConnection.InvokeAsync(nameof(TestHub.GetIHttpConnectionFeatureProperties)).OrTimeout(); - var localPort = (Int64)features[0]; - var remotePort = (Int64)features[1]; + var localPort = (long)features[0]; + var remotePort = (long)features[1]; var localIP = (string)features[2]; var remoteIP = (string)features[3]; diff --git a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs index e79babd306..16e43f36e0 100644 --- a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs +++ b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs @@ -131,6 +131,23 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests { await Clients.Client(Context.ConnectionId).NoClientHandler(); } + + public ChannelReader IncrementEach(ChannelReader source) + { + var output = Channel.CreateUnbounded(); + _ = Task.Run(async () => { + while (await source.WaitToReadAsync()) + { + while (source.TryRead(out var item)) + { + await output.Writer.WriteAsync(item + 1); + } + } + output.Writer.TryComplete(); + }); + + return output.Reader; + } } internal static class TestHubMethodsImpl diff --git a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTestUtils/Hubs.cs index 5e08a5f501..1d05e22e01 100644 --- a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTestUtils/Hubs.cs @@ -571,6 +571,26 @@ namespace Microsoft.AspNetCore.SignalR.Tests { return 42; } + + public ChannelReader StreamEcho(ChannelReader source) + { + Channel output = Channel.CreateUnbounded(); + + _ = Task.Run(async () => + { + while (await source.WaitToReadAsync()) + { + while (source.TryRead(out string item)) + { + await output.Writer.WriteAsync("echo:" + item); + } + } + + output.Writer.TryComplete(); + }); + + return output.Reader; + } } public class SimpleHub : Hub diff --git a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs index 687c0b5ee7..679aa4f84c 100644 --- a/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs +++ b/src/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs @@ -2922,13 +2922,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests [Fact] public async Task UploadStreamCompleteWithError() { - var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(); var connectionHandler = serviceProvider.GetService>(); using (var client = new TestClient()) { - var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + await client.ConnectAsync(connectionHandler).OrTimeout(); await client.BeginUploadStreamAsync("invocation", nameof(MethodHub.TestCustomErrorPassing), new StreamPlaceholder("id")).OrTimeout(); await client.SendHubMessageAsync(new StreamCompleteMessage("id", CustomErrorMessage)).OrTimeout(); @@ -3041,6 +3040,46 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Fact] + public async Task CanPassStreamingParameterToStreamHubMethod() + { + IServiceProvider serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(); + HubConnectionHandler connectionHandler = serviceProvider.GetService>(); + Mock invocationBinder = new Mock(); + invocationBinder.Setup(b => b.GetStreamItemType(It.IsAny())).Returns(typeof(string)); + + using (TestClient client = new TestClient(invocationBinder: invocationBinder.Object)) + { + Task connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + // Wait for a connection, or for the endpoint to fail. + await client.Connected.OrThrowIfOtherFails(connectionHandlerTask).OrTimeout(); + + var streamId = "sample_id"; + var messagePromise = client.StreamAsync("StreamEcho", new StreamPlaceholder(streamId)).OrTimeout(); + + var phrases = new[] { "asdf", "qwer", "zxcv" }; + foreach (var phrase in phrases) + { + await client.SendHubMessageAsync(new StreamDataMessage(streamId, phrase)); + } + await client.SendHubMessageAsync(new StreamCompleteMessage(streamId)); + + var messages = await messagePromise; + + // add one because this includes the completion + Assert.Equal(phrases.Count() + 1, messages.Count); + for (var i = 0; i < phrases.Count(); i++) + { + Assert.Equal("echo:" + phrases[i], ((StreamItemMessage)messages[i]).Item); + } + + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + private class CustomHubActivator : IHubActivator where THub : Hub { public int ReleaseCount;