Fix redis subscriptions and missing awaits

This commit is contained in:
BrennanConroy 2016-11-03 14:58:27 -07:00
parent b114e4e9fd
commit 60bc57fb8d
2 changed files with 17 additions and 15 deletions

View File

@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
@ -39,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
Arguments = args
};
return PublishAsync(typeof(THub).Name, message);
return PublishAsync(typeof(THub).FullName, message);
}
public override Task InvokeConnectionAsync(string connectionId, string methodName, params object[] args)
@ -50,7 +51,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
Arguments = args
};
return PublishAsync(typeof(THub) + "." + connectionId, message);
return PublishAsync(typeof(THub).FullName + "." + connectionId, message);
}
public override Task InvokeGroupAsync(string groupName, string methodName, params object[] args)
@ -61,7 +62,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
Arguments = args
};
return PublishAsync(typeof(THub) + "." + groupName, message);
return PublishAsync(typeof(THub).FullName + "." + groupName, message);
}
public override Task InvokeUserAsync(string userId, string methodName, params object[] args)
@ -72,10 +73,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis
Arguments = args
};
return PublishAsync(typeof(THub) + "." + userId, message);
return PublishAsync(typeof(THub).FullName + "." + userId, message);
}
private Task PublishAsync(string channel, InvocationDescriptor message)
private async Task PublishAsync(string channel, InvocationDescriptor message)
{
// TODO: What format??
var invocationAdapter = _registry.GetInvocationAdapter("json");
@ -83,17 +84,17 @@ namespace Microsoft.AspNetCore.SignalR.Redis
// BAD
using (var ms = new MemoryStream())
{
invocationAdapter.WriteInvocationDescriptorAsync(message, ms);
await invocationAdapter.WriteInvocationDescriptorAsync(message, ms);
return _bus.PublishAsync(channel, ms.ToArray());
await _bus.PublishAsync(channel, ms.ToArray());
}
}
public override Task OnConnectedAsync(Connection connection)
{
var task1 = SubscribeAsync(typeof(THub).Name, connection);
var task2 = SubscribeAsync(typeof(THub).Name + "." + connection.ConnectionId, connection);
var task3 = SubscribeAsync(typeof(THub).Name + "." + connection.User.Identity.Name, connection);
var task1 = SubscribeAsync(typeof(THub).FullName, connection);
var task2 = SubscribeAsync(typeof(THub).FullName + "." + connection.ConnectionId, connection);
var task3 = SubscribeAsync(typeof(THub).FullName + "." + connection.User.Identity.Name, connection);
return Task.WhenAll(task2, task2, task3);
}
@ -115,13 +116,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis
public override Task AddGroupAsync(Connection connection, string groupName)
{
var key = typeof(THub).Name + "." + groupName;
var key = typeof(THub).FullName + "." + groupName;
return SubscribeAsync(key, connection);
}
public override Task RemoveGroupAsync(Connection connection, string groupName)
{
var key = typeof(THub) + "." + groupName;
var key = typeof(THub).FullName + "." + groupName;
return UnsubscribeAsync(key, connection);
}
@ -138,7 +139,8 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return subscriber.SubscribeAsync(channel, (c, data) =>
{
connection.Channel.Output.WriteAsync((byte[])data);
// TODO: Use Task Queue
connection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult();
});
}

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
public ConfigurationOptions Options { get; set; } = new ConfigurationOptions();
public Func<ConnectionMultiplexer> Factory { get; set; }
public Func<TextWriter, ConnectionMultiplexer> Factory { get; set; }
// TODO: Async
internal ConnectionMultiplexer Connect(TextWriter log)
@ -26,7 +26,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return ConnectionMultiplexer.Connect(Options, log);
}
return Factory();
return Factory(log);
}
}
}