streaming from client to server

This commit is contained in:
DylanDmitri 2018-06-04 10:33:31 -07:00 committed by BrennanConroy
parent 24e17eadca
commit 34bb352189
15 changed files with 484 additions and 109 deletions

View File

@ -0,0 +1,27 @@
import { HubConnection } from "./HubConnection";
import { MessageType } from "./IHubProtocol";
export class UploadStream {
private connection: HubConnection;
public readonly streamId: string;
public readonly placeholder: object;
constructor(connection: HubConnection) {
this.connection = connection;
this.streamId = connection.nextStreamId();
this.placeholder = {streamId: this.streamId};
}
public write(item: any): Promise<void> {
return this.connection.sendWithProtocol(this.connection.createStreamDataMessage(this.streamId, item));
}
public complete(error?: string): Promise<void> {
if (error) {
return this.connection.sendWithProtocol({ type: MessageType.StreamComplete, streamId: this.streamId, error });
} else {
return this.connection.sendWithProtocol({ type: MessageType.StreamComplete, streamId: this.streamId });
}
}
}

View File

@ -0,0 +1,128 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<h1> Streaming Parameters </h1>
<h1 id="transportName">Unknown Transport</h1>
<h2>Controls</h2>
<div>
<button id="connectButton" type="button">Connect</button>
<button id="disconnectButton" type="button" disabled>Disconnect</button>
<button id="clearButton" type="button">Clear</button>
</div>
<div>
<button id="sumButton" name="sum" type="button" disabled>Local Sum</button>
<button id="channelButton" name="channel" type="button" disabled>From Channel</button>
</div>
<h2>Results</h2>
<ul id="resultsList"></ul>
<ul id="messages"></ul>
<script src="lib/signalr/signalr.js"></script>
<script src="utils.js"></script>
<script>
document.addEventListener('DOMContentLoaded', function () {
let resultsList = document.getElementById('resultsList');
let channelButton = document.getElementById('channelButton');
let sumButton = document.getElementById('sumButton');
let clearButton = document.getElementById('clearButton');
let connectButton = document.getElementById('connectButton');
let disconnectButton = document.getElementById('disconnectButton');
let transportType = signalR.HttpTransportType[getParameterByName('transport')] || signalR.HttpTransportType.WebSockets;
let invocationCounter = 0;
document.getElementById('transportName').innerHTML = signalR.HttpTransportType[transportType];
let connection = null;
click('clearButton', function () {
resultsList.innerHTML = '';
});
click('disconnectButton', function () {
connection.stop();
});
click('connectButton', function () {
connection = new signalR.HubConnectionBuilder()
.configureLogging(signalR.LogLevel.Trace)
.withUrl("/uploading", transportType)
.build();
connection.onclose(function () {
channelButton.disabled = true;
sumButton.disabled = true;
connectButton.disabled = false;
disconnectButton.disabled = true;
addLine('resultsList', 'disconnected', 'green');
});
connection.start()
.then(function () {
channelButton.disabled = false;
sumButton.disabled = false;
connectButton.disabled = true;
disconnectButton.disabled = false;
addLine('resultsList', 'connected', 'green');
});
});
click('sumButton', function () {
run('Sum');
});
click('channelButton', function () {
run('Echo');
});
async function run(method) {
//let id = invocationCounter;
//invocationCounter += 1;
//alert("invoking " + method);
if (method == "Echo") {
var promise = connection.invoke(method, "hello?");
promise.then(function (result) {
alert("received response -- " + result);
});
}
else if (method == "Sum") {
// var data = new Blob(['D', 'R', 'E', 'A', 'M'], { type: 'plain/text', endings: 'native' });
var stream = connection.newUploadStream();
var promise = connection.invoke("UploadWord", stream);
await stream.write("Z");
await stream.write("O");
await stream.write("O");
await stream.write("P");
await stream.write("!");
await stream.complete();
promise.then(function (result) {
alert("received response -- " + result);
});
}
else {
alert("SOMETHING VERY WRONG")
}
//send items
}
});
</script>
</body>
</html>

View File

@ -1,9 +1,11 @@
import { UploadStream } from "./UploadStream";
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } from "./HandshakeProtocol";
import { IConnection } from "./IConnection";
import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamDataMessage, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
import { ILogger, LogLevel } from "./ILogger";
import { IStreamResult } from "./Stream";
import { Arg, Subject } from "./Utils";
@ -29,6 +31,7 @@ export class HubConnection {
private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void };
private methods: { [name: string]: Array<(...args: any[]) => void> };
private id: number;
private streamId: number;
private closedCallbacks: Array<(error?: Error) => void>;
private receivedHandshakeResponse: boolean;
private handshakeResolver!: (value?: PromiseLike<{}>) => void;
@ -84,6 +87,7 @@ export class HubConnection {
this.methods = {};
this.closedCallbacks = [];
this.id = 0;
this.streamId = 0;
this.receivedHandshakeResponse = false;
this.connectionState = HubConnectionState.Disconnected;
@ -122,7 +126,7 @@ export class HubConnection {
this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
// defensively cleanup timeout in case we receive a message from the server before we finish start
// defensively cleanup timeout in case we receive a message export from the server before we finish start
this.cleanupTimeout();
this.resetTimeoutPeriod();
this.resetKeepAliveInterval();
@ -156,11 +160,10 @@ export class HubConnection {
const subject = new Subject<T>(() => {
const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId);
const cancelMessage: any = this.protocol.writeMessage(cancelInvocation);
delete this.callbacks[invocationDescriptor.invocationId];
return this.sendMessage(cancelMessage);
return this.sendWithProtocol(cancelInvocation);
});
this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => {
@ -181,9 +184,7 @@ export class HubConnection {
}
};
const message = this.protocol.writeMessage(invocationDescriptor);
this.sendMessage(message)
this.sendWithProtocol(invocationDescriptor)
.catch((e) => {
subject.error(e);
delete this.callbacks[invocationDescriptor.invocationId];
@ -197,6 +198,14 @@ export class HubConnection {
return this.connection.send(message);
}
/**
* Sends a js object to the server.
* @param message The js object to serialize and send.
*/
public sendWithProtocol(message: any) {
return this.sendMessage(this.protocol.writeMessage(message));
}
/** Invokes a hub method on the server using the specified name and arguments. Does not wait for a response from the receiver.
*
* The Promise returned by this method resolves when the client has sent the invocation to the server. The server may still
@ -207,11 +216,16 @@ export class HubConnection {
* @returns {Promise<void>} A Promise that resolves when the invocation has been successfully sent, or rejects with an error.
*/
public send(methodName: string, ...args: any[]): Promise<void> {
const invocationDescriptor = this.createInvocation(methodName, args, true);
return this.sendWithProtocol(this.createInvocation(methodName, args, true));
}
const message = this.protocol.writeMessage(invocationDescriptor);
public nextStreamId(): string {
this.streamId += 1;
return this.streamId.toString();
}
return this.sendMessage(message);
public newUploadStream(): UploadStream {
return new UploadStream(this);
}
/** Invokes a hub method on the server using the specified name and arguments.
@ -229,7 +243,7 @@ export class HubConnection {
const invocationDescriptor = this.createInvocation(methodName, args, false);
const p = new Promise<any>((resolve, reject) => {
// invocationId will always have a value for a non-blocking invocation
// invocationId will always have a value for a non-blocking inexport vocation
this.callbacks[invocationDescriptor.invocationId!] = (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => {
if (error) {
reject(error);
@ -248,9 +262,7 @@ export class HubConnection {
}
};
const message = this.protocol.writeMessage(invocationDescriptor);
this.sendMessage(message)
this.sendWithProtocol(invocationDescriptor)
.catch((e) => {
reject(e);
// invocationId will always have a value for a non-blocking invocation
@ -538,4 +550,12 @@ export class HubConnection {
type: MessageType.CancelInvocation,
};
}
public createStreamDataMessage(id: string, item: any): StreamDataMessage {
return {
item,
streamId: id,
type: MessageType.StreamData,
};
}
}

View File

@ -20,6 +20,10 @@ export enum MessageType {
Ping = 6,
/** Indicates the message is a Close message and implements the {@link @aspnet/signalr.CloseMessage} interface. */
Close = 7,
/** Indicates the message is a StreamComplete message and implements the {@link StreamCompleteMessage} interface */
StreamComplete = 8,
/** Indicates the message is a ParamterStreaming message and implements the {@link StreamDataMessage} interface */
StreamData = 9,
}
/** Defines a dictionary of string keys and string values representing headers attached to a Hub message. */
@ -29,14 +33,14 @@ export interface MessageHeaders {
}
/** Union type of all known Hub messages. */
export type HubMessage =
InvocationMessage |
StreamInvocationMessage |
StreamItemMessage |
CompletionMessage |
CancelInvocationMessage |
PingMessage |
CloseMessage;
export type HubMessage = InvocationMessage
| StreamInvocationMessage
| StreamItemMessage
| CompletionMessage
| CancelInvocationMessage
| PingMessage
| CloseMessage
| StreamDataMessage;
/** Defines properties common to all Hub messages. */
export interface HubMessageBase {
@ -91,6 +95,18 @@ export interface StreamItemMessage extends HubInvocationMessage {
readonly item?: any;
}
/** A hub message representing a single stream item, transferred through a streaming parameter. */
export interface StreamDataMessage extends HubMessageBase {
/** @inheritDoc */
readonly type: MessageType.StreamData;
/** The streamId */
readonly streamId: string;
/** The item produced by the client */
readonly item?: any;
}
/** A hub message representing the result of an invocation. */
export interface CompletionMessage extends HubInvocationMessage {
/** @inheritDoc */
@ -137,6 +153,16 @@ export interface CancelInvocationMessage extends HubInvocationMessage {
readonly invocationId: string;
}
/** A hub message send to indicate the end of stream items for a streaming parameter. */
export interface StreamCompleteMessage extends HubMessageBase {
/** @inheritDoc */
readonly type: MessageType.StreamComplete;
/** The stream ID of the stream to be completed. */
readonly streamId: string;
/** The error that trigger completion, if any. */
readonly error?: string;
}
/** A protocol abstraction for communicating with SignalR Hubs. */
export interface IHubProtocol {
/** The name of the protocol. This is used by SignalR to resolve the protocol between the client and server. */

View File

@ -330,6 +330,39 @@ describe("HubConnection", () => {
});
});
it("is able to send stream items to server", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
const stream = hubConnection.newUploadStream();
const invokePromise = hubConnection.invoke("testMethod", "arg", stream.placeholder);
expect(JSON.parse(connection.sentData[0])).toEqual({
arguments: ["arg", {streamId: "1"}],
invocationId: "0",
target: "testMethod",
type: MessageType.Invocation,
});
await stream.write("item numero uno");
expect(JSON.parse(connection.sentData[1])).toEqual({
item: "item numero uno",
streamId: "1",
type: MessageType.StreamData,
});
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" });
expect(await invokePromise).toBe("foo");
} finally {
await hubConnection.stop();
}
});
});
it("completes pending invocations when stopped", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();

View File

@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Client;
@ -18,7 +19,7 @@ namespace ClientSample
{
cmd.Description = "Tests a streaming invocation from client to hub";
var baseUrlArgument = cmd.Argument("<BASEURL>", "The URL to the Chat Hub to test");
CommandArgument baseUrlArgument = cmd.Argument("<BASEURL>", "The URL to the Chat Hub to test");
cmd.OnExecute(() => ExecuteAsync(baseUrlArgument.Value));
});
@ -31,9 +32,10 @@ namespace ClientSample
.Build();
await connection.StartAsync();
await BasicInvoke(connection);
//await MultiParamInvoke(connection);
//await AdditionalArgs(connection);
//await BasicInvoke(connection);
//await ScoreTrackerExample(connection);
//await FileUploadExample(connection);
await StreamingEcho(connection);
return 0;
}
@ -46,6 +48,7 @@ namespace ClientSample
foreach (var c in "hello")
{
await channel.Writer.WriteAsync(c.ToString());
await Task.Delay(1000);
}
channel.Writer.TryComplete();
@ -53,37 +56,96 @@ namespace ClientSample
Debug.WriteLine($"You message was: {result}");
}
private static async Task WriteStreamAsync<T>(IEnumerable<T> sequence, ChannelWriter<T> writer)
public static async Task ScoreTrackerExample(HubConnection connection)
{
foreach (T element in sequence)
{
await writer.WriteAsync(element);
await Task.Delay(100);
}
// Andrew please add the updated code from your laptop here
writer.TryComplete();
}
public static async Task MultiParamInvoke(HubConnection connection)
{
var letters = Channel.CreateUnbounded<string>();
var numbers = Channel.CreateUnbounded<int>();
_ = WriteStreamAsync(new[] { "h", "i", "!" }, letters.Writer);
_ = WriteStreamAsync(new[] { 1, 2, 3, 4, 5 }, numbers.Writer);
var result = await connection.InvokeAsync<string>("DoubleStreamUpload", letters.Reader, numbers.Reader);
var channel_one = Channel.CreateBounded<int>(2);
var channel_two = Channel.CreateBounded<int>(2);
_ = WriteItemsAsync(channel_one.Writer, new[] { 2, 2, 3 });
_ = WriteItemsAsync(channel_two.Writer, new[] { -2, 5, 3 });
var result = await connection.InvokeAsync<string>("ScoreTracker", channel_one.Reader, channel_two.Reader);
Debug.WriteLine(result);
async Task WriteItemsAsync(ChannelWriter<int> source, IEnumerable<int> scores)
{
await Task.Delay(1000);
foreach (var c in scores)
{
await source.WriteAsync(c);
await Task.Delay(250);
}
// tryComplete triggers the end of this upload's relayLoop
// which sends a StreamComplete to the server
source.TryComplete();
}
}
public static async Task AdditionalArgs(HubConnection connection)
public static async Task FileUploadExample(HubConnection connection)
{
var channel = Channel.CreateUnbounded<char>();
_ = WriteStreamAsync<char>("main message".ToCharArray(), channel.Writer);
var fileNameSource = @"C:\Users\t-dygray\Pictures\weeg.jpg";
var fileNameDest = @"C:\Users\t-dygray\Pictures\TargetFolder\weeg.jpg";
var result = await connection.InvokeAsync<string>("UploadWithSuffix", channel.Reader, " + wooh I'm a suffix");
Debug.WriteLine($"Your message was: {result}");
var channel = Channel.CreateUnbounded<byte[]>();
var invocation = connection.InvokeAsync<string>("UploadFile", fileNameDest, channel.Reader);
using (var file = new FileStream(fileNameSource, FileMode.Open, FileAccess.Read))
{
foreach (var chunk in GetChunks(file, kilobytesPerChunk: 5))
{
await channel.Writer.WriteAsync(chunk);
}
}
channel.Writer.TryComplete();
Debug.WriteLine(await invocation);
}
public static IEnumerable<byte[]> GetChunks(FileStream fileStream, double kilobytesPerChunk)
{
var chunkSize = (int)kilobytesPerChunk * 1024;
var position = 0;
while (true)
{
if (position + chunkSize > fileStream.Length)
{
var lastChunk = new byte[fileStream.Length - position];
fileStream.Read(lastChunk, 0, lastChunk.Length);
yield return lastChunk;
break;
}
var chunk = new byte[chunkSize];
position += fileStream.Read(chunk, 0, chunk.Length);
yield return chunk;
}
}
public static async Task StreamingEcho(HubConnection connection)
{
var channel = Channel.CreateUnbounded<string>();
_ = Task.Run(async () =>
{
foreach (var phrase in new[] { "one fish", "two fish", "red fish", "blue fish" })
{
await channel.Writer.WriteAsync(phrase);
}
});
var outputs = await connection.StreamAsChannelAsync<string>("StreamEcho", channel.Reader);
while (await outputs.WaitToReadAsync())
{
while (outputs.TryRead(out var item))
{
Debug.WriteLine($"received '{item}'.");
}
}
}
}
}

View File

@ -2,10 +2,12 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
@ -14,38 +16,10 @@ namespace SignalRSamples.Hubs
{
public class UploadHub : Hub
{
public async Task<string> DoubleStreamUpload(ChannelReader<string> letters, ChannelReader<int> numbers)
{
var total = await Sum(numbers);
var word = await UploadWord(letters);
return string.Format("You sent over <{0}> <{1}s>", total, word);
}
public async Task<int> Sum(ChannelReader<int> source)
public string Echo(string word)
{
var total = 0;
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
total += item;
}
}
return total;
}
public async Task LocalSum(ChannelReader<int> source)
{
var total = 0;
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
total += item;
}
}
Debug.WriteLine(String.Format("Complete, your total is <{0}>.", total));
return "Echo: " + word;
}
public async Task<string> UploadWord(ChannelReader<string> source)
@ -55,7 +29,7 @@ namespace SignalRSamples.Hubs
// receiving a StreamCompleteMessage should cause this WaitToRead to return false
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
while (source.TryRead(out string item))
{
Debug.WriteLine($"received: {item}");
Console.WriteLine($"received: {item}");
@ -67,29 +41,35 @@ namespace SignalRSamples.Hubs
return sb.ToString();
}
public async Task<string> UploadWithSuffix(ChannelReader<string> source, string suffix)
public async Task<string> ScoreTracker(ChannelReader<int> player1, ChannelReader<int> player2)
{
var sb = new StringBuilder();
var p1score = await Loop(player1);
var p2score = await Loop(player2);
while (await source.WaitToReadAsync())
var winner = p1score > p2score ? "p1" : "p2";
return $"{winner} wins with a total of {Math.Max(p1score, p2score)} points to {Math.Min(p1score, p2score)}";
async Task<int> Loop(ChannelReader<int> reader)
{
while (source.TryRead(out var item))
var score = 0;
while (await reader.WaitToReadAsync())
{
await Task.Delay(50);
Debug.WriteLine($"received: {item}");
sb.Append(item);
while (reader.TryRead(out int item))
{
Debug.WriteLine($"got score {item}");
score += item;
}
}
return score;
}
sb.Append(suffix);
return sb.ToString();
}
public async Task<string> UploadFile(ChannelReader<byte[]> source, string filepath)
public async Task UploadFile(string filepath, ChannelReader<byte[]> source)
{
var result = Enumerable.Empty<byte>();
int chunk = 1;
var chunk = 1;
while (await source.WaitToReadAsync())
{
@ -102,9 +82,27 @@ namespace SignalRSamples.Hubs
}
File.WriteAllBytes(filepath, result.ToArray());
}
Debug.WriteLine("returning status code");
return $"file written to '{filepath}'";
public ChannelReader<string> StreamEcho(ChannelReader<string> source)
{
var output = Channel.CreateUnbounded<string>();
_ = Task.Run(async () =>
{
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
Debug.WriteLine($"Echoing '{item}'.");
await output.Writer.WriteAsync("echo:" + item);
}
}
output.Writer.Complete();
});
return output.Reader;
}
}
}

View File

@ -1,4 +1,4 @@
<!DOCTYPE html>
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
@ -22,5 +22,11 @@
<li><a href="streaming.html?transport=ServerSentEvents">Server Sent Events</a></li>
<li><a href="streaming.html?transport=WebSockets">Web Sockets</a></li>
</ul>
<h1> ASP.NET Core SignalR (Parameter Streaming via Channels) </h1>
<u1>
<li><a href="channelParameters.html?transport=LongPolling">Upload streaming via Long polling</a></li>
<li><a href="channelParameters.html?transport=ServerSentEvents">Upload streaming via SSE</a></li>
<li><a href="channelParameters.html?transport=WebSockets">Upload streaming via Websockets</a></li>
</u1>
</body>
</html>

View File

@ -26,7 +26,7 @@ namespace Microsoft.AspNetCore.SignalR
// TODO #2594 - add Streams here, to make sending files easy
while (type != null)
{
if (type.GetGenericTypeDefinition() == typeof(ChannelReader<>))
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(ChannelReader<>))
{
return true;
}

View File

@ -420,6 +420,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
irq.Dispose();
}
var readers = PackageStreamingParams(args);
CheckDisposed();
await WaitConnectionLockAsync();
@ -444,12 +446,14 @@ namespace Microsoft.AspNetCore.SignalR.Client
ReleaseConnectionLock();
}
LaunchStreams(readers, cancellationToken);
return channel;
}
private Dictionary<string, object> PackageStreamingParams(object[] args)
{
// lazy initialized, to avoid allocation unecessary dictionaries
// lazy initialized, to avoid allocating unecessary dictionaries
Dictionary<string, object> readers = null;
for (var i = 0; i < args.Length; i++)
@ -621,7 +625,6 @@ namespace Microsoft.AspNetCore.SignalR.Client
var readers = PackageStreamingParams(args);
Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length);
var invocationMessage = new InvocationMessage(null, methodName, args);
await SendWithLock(invocationMessage, callerName: nameof(SendCoreAsync));

View File

@ -176,10 +176,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
else
{
bool isStreamCall = descriptor.HasStreamingParameters;
if (isStreamResponse && isStreamCall)
{
throw new NotSupportedException("Streaming responses for streaming uploads are not supported.");
}
return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall);
}
}

View File

@ -882,8 +882,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try
{
await hubConnection.StartAsync().OrTimeout();
var headerValues = await hubConnection.InvokeAsync<string[]>(nameof(TestHub.GetHeaderValues), new[] {"X-test", "X-42"}).OrTimeout();
Assert.Equal(new[] {"42", "test"}, headerValues);
var headerValues = await hubConnection.InvokeAsync<string[]>(nameof(TestHub.GetHeaderValues), new[] { "X-test", "X-42" }).OrTimeout();
Assert.Equal(new[] { "42", "test" }, headerValues);
}
catch (Exception ex)
{
@ -946,8 +946,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
await hubConnection.StartAsync().OrTimeout();
var features = await hubConnection.InvokeAsync<object[]>(nameof(TestHub.GetIHttpConnectionFeatureProperties)).OrTimeout();
var localPort = (Int64)features[0];
var remotePort = (Int64)features[1];
var localPort = (long)features[0];
var remotePort = (long)features[1];
var localIP = (string)features[2];
var remoteIP = (string)features[3];

View File

@ -131,6 +131,23 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
await Clients.Client(Context.ConnectionId).NoClientHandler();
}
public ChannelReader<int> IncrementEach(ChannelReader<int> source)
{
var output = Channel.CreateUnbounded<int>();
_ = Task.Run(async () => {
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
await output.Writer.WriteAsync(item + 1);
}
}
output.Writer.TryComplete();
});
return output.Reader;
}
}
internal static class TestHubMethodsImpl

View File

@ -571,6 +571,26 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
return 42;
}
public ChannelReader<string> StreamEcho(ChannelReader<string> source)
{
Channel<string> output = Channel.CreateUnbounded<string>();
_ = Task.Run(async () =>
{
while (await source.WaitToReadAsync())
{
while (source.TryRead(out string item))
{
await output.Writer.WriteAsync("echo:" + item);
}
}
output.Writer.TryComplete();
});
return output.Reader;
}
}
public class SimpleHub : Hub

View File

@ -2922,13 +2922,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests
[Fact]
public async Task UploadStreamCompleteWithError()
{
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider();
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
await client.ConnectAsync(connectionHandler).OrTimeout();
await client.BeginUploadStreamAsync("invocation", nameof(MethodHub.TestCustomErrorPassing), new StreamPlaceholder("id")).OrTimeout();
await client.SendHubMessageAsync(new StreamCompleteMessage("id", CustomErrorMessage)).OrTimeout();
@ -3041,6 +3040,46 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task CanPassStreamingParameterToStreamHubMethod()
{
IServiceProvider serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider();
HubConnectionHandler<StreamingHub> connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
Mock<IInvocationBinder> invocationBinder = new Mock<IInvocationBinder>();
invocationBinder.Setup(b => b.GetStreamItemType(It.IsAny<string>())).Returns(typeof(string));
using (TestClient client = new TestClient(invocationBinder: invocationBinder.Object))
{
Task connectionHandlerTask = await client.ConnectAsync(connectionHandler);
// Wait for a connection, or for the endpoint to fail.
await client.Connected.OrThrowIfOtherFails(connectionHandlerTask).OrTimeout();
var streamId = "sample_id";
var messagePromise = client.StreamAsync("StreamEcho", new StreamPlaceholder(streamId)).OrTimeout();
var phrases = new[] { "asdf", "qwer", "zxcv" };
foreach (var phrase in phrases)
{
await client.SendHubMessageAsync(new StreamDataMessage(streamId, phrase));
}
await client.SendHubMessageAsync(new StreamCompleteMessage(streamId));
var messages = await messagePromise;
// add one because this includes the completion
Assert.Equal(phrases.Count() + 1, messages.Count);
for (var i = 0; i < phrases.Count(); i++)
{
Assert.Equal("echo:" + phrases[i], ((StreamItemMessage)messages[i]).Item);
}
client.Dispose();
await connectionHandlerTask.OrTimeout();
}
}
private class CustomHubActivator<THub> : IHubActivator<THub> where THub : Hub
{
public int ReleaseCount;