Adding MsgPack HubProtocol

This commit is contained in:
Pawel Kadluczka 2017-06-23 16:09:52 -07:00
parent 13ef35fa79
commit 3504337918
10 changed files with 558 additions and 136 deletions

View File

@ -15,6 +15,7 @@
<TestSdkVersion>15.3.0-*</TestSdkVersion>
<XunitVersion>2.3.0-beta2-*</XunitVersion>
<RxVersion>3.1.1</RxVersion>
<MsgPackVersion>0.9.0-beta2</MsgPackVersion>
<!--
TODO remove in next update of xunit

View File

@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
throw new InvalidOperationException("Failed to write message to the output stream");
}
return output.ToArray();
}
}

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;

View File

@ -0,0 +1,233 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using MsgPack;
using MsgPack.Serialization;
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class MessagePackHubProtocol : IHubProtocol
{
private const int InvocationMessageType = 1;
private const int StreamItemMessageType = 2;
private const int CompletionMessageType = 3;
public bool TryParseMessages(ReadOnlySpan<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();
using (var memoryStream = new MemoryStream(input.ToArray()))
{
messages.Add(ParseMessage(memoryStream, binder));
}
return messages.Count > 0;
}
private static HubMessage ParseMessage(Stream input, IInvocationBinder binder)
{
var unpacker = Unpacker.Create(input);
if (!unpacker.ReadInt32(out var messageType))
{
throw new FormatException("Message type is missing.");
}
switch (messageType)
{
case InvocationMessageType:
return CreateInvocationMessage(unpacker, binder);
case StreamItemMessageType:
return CreateStreamItemMessage(unpacker, binder);
case CompletionMessageType:
return CreateCompletionMessage(unpacker, binder);
default:
throw new FormatException($"Invalid message type: {messageType}.");
}
}
private static InvocationMessage CreateInvocationMessage(Unpacker unpacker, IInvocationBinder binder)
{
var invocationId = ReadInvocationId(unpacker);
var nonBlocking = ReadBoolean(unpacker, "nonBlocking");
var target = ReadString(unpacker, "target");
var argumentCount = ReadArrayLength(unpacker, "arguments");
var parameterTypes = binder.GetParameterTypes(target);
if (parameterTypes.Length != argumentCount)
{
throw new FormatException(
$"Target method expects {parameterTypes.Length} arguments(s) but invocation has {argumentCount} argument(s).");
}
var arguments = new object[argumentCount];
for (var i = 0; i < argumentCount; i++)
{
arguments[i] = DeserializeObject(unpacker, parameterTypes[i], "argument");
}
return new InvocationMessage(invocationId, nonBlocking, target, arguments);
}
private static StreamItemMessage CreateStreamItemMessage(Unpacker unpacker, IInvocationBinder binder)
{
var invocationId = ReadInvocationId(unpacker);
var itemType = binder.GetReturnType(invocationId);
var value = DeserializeObject(unpacker, itemType, "item");
return new StreamItemMessage(invocationId, value);
}
private static CompletionMessage CreateCompletionMessage(Unpacker unpacker, IInvocationBinder binder)
{
var invocationId = ReadInvocationId(unpacker);
var error = ReadString(unpacker, "error");
var hasResult = false;
object result = null;
if (error == null)
{
hasResult = ReadBoolean(unpacker, "hasResult");
if (hasResult)
{
var itemType = binder.GetReturnType(invocationId);
result = DeserializeObject(unpacker, itemType, "argument");
}
}
return new CompletionMessage(invocationId, error, result, hasResult);
}
// TODO: when to return false?
public bool TryWriteMessage(HubMessage message, Stream output)
{
var packer = Packer.Create(output);
switch (message)
{
case InvocationMessage invocationMessage:
WriteInvocationMessage(invocationMessage, packer, output);
break;
case StreamItemMessage streamItemMessage:
WriteStremingItemMessage(streamItemMessage, packer, output);
break;
case CompletionMessage completionMessage:
WriteCompletionMessage(completionMessage, packer, output);
break;
default:
throw new FormatException($"Unexpected message type: {message.GetType().Name}");
}
return true;
}
private static void WriteInvocationMessage(InvocationMessage invocationMessage, Packer packer, Stream output)
{
packer.Pack(InvocationMessageType);
packer.PackString(invocationMessage.InvocationId);
packer.Pack(invocationMessage.NonBlocking);
packer.PackString(invocationMessage.Target);
packer.PackObject(invocationMessage.Arguments);
}
private void WriteStremingItemMessage(StreamItemMessage streamItemMessage, Packer packer, Stream output)
{
packer.Pack(StreamItemMessageType);
packer.PackString(streamItemMessage.InvocationId);
packer.PackObject(streamItemMessage.Item);
}
private void WriteCompletionMessage(CompletionMessage completionMessage, Packer packer, Stream output)
{
packer.Pack(CompletionMessageType);
packer.PackString(completionMessage.InvocationId);
packer.PackString(completionMessage.Error);
if (completionMessage.Error == null)
{
packer.Pack(completionMessage.HasResult);
if (completionMessage.HasResult)
{
packer.PackObject(completionMessage.Result);
}
}
}
private static string ReadInvocationId(Unpacker unpacker)
{
return ReadString(unpacker, "invocationId");
}
private static string ReadString(Unpacker unpacker, string field)
{
Exception msgPackException = null;
try
{
if (unpacker.ReadString(out var value))
{
return value;
}
}
catch (Exception e)
{
msgPackException = e;
}
throw new FormatException($"Reading '{field}' as String failed.", msgPackException);
}
private static bool ReadBoolean(Unpacker unpacker, string field)
{
Exception msgPackException = null;
try
{
if (unpacker.ReadBoolean(out var value))
{
return value;
}
}
catch (Exception e)
{
msgPackException = e;
}
throw new FormatException($"Reading '{field}' as Boolean failed.", msgPackException);
}
private static long ReadArrayLength(Unpacker unpacker, string field)
{
Exception msgPackException = null;
try
{
if (unpacker.ReadArrayLength(out var value))
{
return value;
}
}
catch (Exception e)
{
msgPackException = e;
}
throw new FormatException($"Reading array length for '{field}' failed.", msgPackException);
}
private static object DeserializeObject(Unpacker unpacker, Type type, string field)
{
Exception msgPackException = null;
try
{
if (unpacker.Read())
{
var serializer = MessagePackSerializer.Get(type);
return serializer.UnpackFrom(unpacker);
}
}
catch (Exception ex)
{
msgPackException = ex;
}
throw new FormatException($"Deserializing object of the `{type.Name}` type for '{field}' failed.", msgPackException);
}
}
}

View File

@ -16,6 +16,7 @@
<PackageReference Include="Newtonsoft.Json" Version="$(JsonNetVersion)" />
<PackageReference Include="System.Buffers.Primitives" Version="$(CoreFxLabsVersion)" />
<PackageReference Include="System.Binary" Version="$(CoreFxLabsVersion)" />
<PackageReference Include="MsgPack.Cli" Version="$(MsgPackVersion)" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,42 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
{
public class CustomObject : IEquatable<CustomObject>
{
// Not intended to be a full set of things, just a smattering of sample serializations
public string StringProp { get; set; } = "SignalR!";
public double DoubleProp { get; set; } = 6.2831853071;
public int IntProp { get; set; } = 42;
public DateTime DateTimeProp { get; set; } = new DateTime(2017, 4, 11);
public object NullProp { get; set; } = null;
public override bool Equals(object obj)
{
return obj is CustomObject o && Equals(o);
}
public override int GetHashCode()
{
// This is never used in a hash table
return 0;
}
public bool Equals(CustomObject right)
{
// This allows the comparer below to properly compare the object in the test.
return string.Equals(StringProp, right.StringProp, StringComparison.Ordinal) &&
DoubleProp == right.DoubleProp &&
IntProp == right.IntProp &&
DateTime.Equals(DateTimeProp, right.DateTimeProp) &&
NullProp == right.NullProp;
}
}
}

View File

@ -1,10 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Newtonsoft.Json;
@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
var protocol = new JsonHubProtocol(jsonSerializer);
protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, out var messages);
Assert.Equal(expectedMessage, messages[0], TestEqualityComparer.Instance);
Assert.Equal(expectedMessage, messages[0], TestHubMessageEqualityComparer.Instance);
}
[Theory]
@ -146,135 +146,5 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
Assert.True(TextMessageFormatter.TryWriteMessage(message, output));
return output.ToArray();
}
private class CustomObject : IEquatable<CustomObject>
{
// Not intended to be a full set of things, just a smattering of sample serializations
public string StringProp => "SignalR!";
public double DoubleProp => 6.2831853071;
public int IntProp => 42;
public DateTime DateTimeProp => new DateTime(2017, 4, 11);
public object NullProp => null;
public override bool Equals(object obj)
{
return obj is CustomObject o && Equals(o);
}
public override int GetHashCode()
{
// This is never used in a hash table
return 0;
}
public bool Equals(CustomObject right)
{
// This allows the comparer below to properly compare the object in the test.
return string.Equals(StringProp, right.StringProp, StringComparison.Ordinal) &&
DoubleProp == right.DoubleProp &&
IntProp == right.IntProp &&
DateTime.Equals(DateTimeProp, right.DateTimeProp) &&
NullProp == right.NullProp;
}
}
// Binder that works based on the expected message argument/result types :)
private class TestBinder : IInvocationBinder
{
private readonly Type[] _paramTypes;
private readonly Type _returnType;
public TestBinder(HubMessage expectedMessage)
{
switch(expectedMessage)
{
case InvocationMessage i:
_paramTypes = i.Arguments?.Select(a => a?.GetType() ?? typeof(object))?.ToArray();
break;
case StreamItemMessage s:
_returnType = s.Item?.GetType() ?? typeof(object);
break;
case CompletionMessage c:
_returnType = c.Result?.GetType() ?? typeof(object);
break;
}
}
public TestBinder() : this(null, null) { }
public TestBinder(Type[] paramTypes) : this(paramTypes, null) { }
public TestBinder(Type returnType) : this(null, returnType) {}
public TestBinder(Type[] paramTypes, Type returnType)
{
_paramTypes = paramTypes;
_returnType = returnType;
}
public Type[] GetParameterTypes(string methodName)
{
if (_paramTypes != null)
{
return _paramTypes;
}
throw new InvalidOperationException("Unexpected binder call");
}
public Type GetReturnType(string invocationId)
{
if (_returnType != null)
{
return _returnType;
}
throw new InvalidOperationException("Unexpected binder call");
}
}
private class TestEqualityComparer : IEqualityComparer<HubMessage>
{
public static readonly TestEqualityComparer Instance = new TestEqualityComparer();
private TestEqualityComparer() { }
public bool Equals(HubMessage x, HubMessage y)
{
if (!string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal))
{
return false;
}
return InvocationMessagesEqual(x, y) || StreamItemMessagesEqual(x, y) || CompletionMessagesEqual(x, y);
}
private bool CompletionMessagesEqual(HubMessage x, HubMessage y)
{
return x is CompletionMessage left && y is CompletionMessage right &&
string.Equals(left.Error, right.Error, StringComparison.Ordinal) &&
Equals(left.Result, right.Result) &&
left.HasResult == right.HasResult;
}
private bool StreamItemMessagesEqual(HubMessage x, HubMessage y)
{
return x is StreamItemMessage left && y is StreamItemMessage right &&
Equals(left.Item, right.Item);
}
private bool InvocationMessagesEqual(HubMessage x, HubMessage y)
{
return x is InvocationMessage left && y is InvocationMessage right &&
string.Equals(left.Target, right.Target, StringComparison.Ordinal) &&
Enumerable.SequenceEqual(left.Arguments, right.Arguments) &&
left.NonBlocking == right.NonBlocking;
}
public int GetHashCode(HubMessage obj)
{
// We never use these in a hash-table
return 0;
}
}
}
}

View File

@ -0,0 +1,110 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using MsgPack;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
{
public class MessagePackHubProtocolTests
{
private static readonly MessagePackHubProtocol _hubProtocol = new MessagePackHubProtocol();
public static IEnumerable<object[]> TestMessages => new[]
{
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ false, "method") },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method") },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method", new object[] { null } ) },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method", 42) },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method", 42, "string") },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method", 42, "string", new CustomObject()) },
new object[]{ new InvocationMessage("xyz", /*nonBlocking*/ true, "method", new[] { new CustomObject(), new CustomObject() }) },
new object[]{ new CompletionMessage("xyz", error: "Error not found!", result: null, hasResult: false) },
new object[]{ new CompletionMessage("xyz", error: null, result: null, hasResult: false) },
new object[]{ new CompletionMessage("xyz", error: null, result: null, hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: 42, hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: 42.0f, hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: "string", hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: true, hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: new CustomObject(), hasResult: true) },
new object[]{ new CompletionMessage("xyz", error: null, result: new[] { new CustomObject(), new CustomObject() }, hasResult: true) },
new object[]{ new StreamItemMessage("xyz", null)},
new object[]{ new StreamItemMessage("xyz", 42)},
new object[]{ new StreamItemMessage("xyz", 42.0f)},
new object[]{ new StreamItemMessage("xyz", "string")},
new object[]{ new StreamItemMessage("xyz", true)},
new object[]{ new StreamItemMessage("xyz", new CustomObject())},
new object[]{ new StreamItemMessage("xyz", new[] { new CustomObject(), new CustomObject() })}
};
[Theory]
[MemberData(nameof(TestMessages))]
public void CanRoundTripInvocationMessage(HubMessage hubMessage)
{
using (var memoryStream = new MemoryStream())
{
_hubProtocol.TryWriteMessage(hubMessage, memoryStream);
_hubProtocol.TryParseMessages(
new ReadOnlySpan<byte>(memoryStream.ToArray()), new TestBinder(hubMessage), out var messages);
Assert.Equal(1, messages.Count);
Assert.Equal(hubMessage, messages[0], TestHubMessageEqualityComparer.Instance);
}
}
public static IEnumerable<object[]> InvalidPayloads => new[]
{
new object[] { new byte[0], "Message type is missing." },
new object[] { new byte[] { 0x0a } , "Invalid message type: 10." },
// InvocationMessage
new object[] { new byte[] { 0x01 }, "Reading 'invocationId' as String failed." }, // invocationId missing
new object[] { new byte[] { 0x01, 0xc2 }, "Reading 'invocationId' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a }, "Reading 'nonBlocking' as Boolean failed." }, // nonBlocking missing
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0x00 }, "Reading 'nonBlocking' as Boolean failed." }, // nonBlocking is not bool
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2 }, "Reading 'target' as String failed." }, // target missing
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0x00 }, "Reading 'target' as String failed." }, // 0x00 is Int
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1 }, "Reading 'target' as String failed." }, // string is cut
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78 }, "Reading array length for 'arguments' failed." }, // array is missing
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78, 0x00 }, "Reading array length for 'arguments' failed." }, // 0x00 is not array marker
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78, 0x91 }, "Deserializing object of the `String` type for 'argument' failed." }, // array is missing elements
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78, 0x91, 0xa2, 0x78 }, "Deserializing object of the `String` type for 'argument' failed." }, // array element is cut
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78, 0x92, 0xa0, 0x00 }, "Target method expects 1 arguments(s) but invocation has 2 argument(s)." }, // argument count does not match binder argument count
new object[] { new byte[] { 0x01, 0xa3, 0x78, 0x79, 0x7a, 0xc2, 0xa1, 0x78, 0x91, 0x00 }, "Deserializing object of the `String` type for 'argument' failed." }, // argument type mismatch
// StreamItemMessage
new object[] { new byte[] { 0x02 }, "Reading 'invocationId' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x02, 0xc2 }, "Reading 'invocationId' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x02, 0xa3, 0x78, 0x79, 0x7a }, "Deserializing object of the `String` type for 'item' failed." }, // item is missing
new object[] { new byte[] { 0x02, 0xa3, 0x78, 0x79, 0x7a, 0x00 }, "Deserializing object of the `String` type for 'item' failed." }, // item type mismatch
// CompletionMessage
new object[] { new byte[] { 0x03 }, "Reading 'invocationId' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x03, 0xc2 }, "Reading 'invocationId' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc2 }, "Reading 'error' as String failed." }, // 0xc2 is Bool false
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xa1 }, "Reading 'error' as String failed." }, // error is cut
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc0 }, "Reading 'hasResult' as Boolean failed." }, // hasResult missing
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc0, 0xa0 }, "Reading 'hasResult' as Boolean failed." }, // 0xa0 is string
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc0, 0xc3 }, "Deserializing object of the `String` type for 'argument' failed." }, // result missing
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc0, 0xc3, 0xa9 }, "Deserializing object of the `String` type for 'argument' failed." }, // result is cut
new object[] { new byte[] { 0x03, 0xa3, 0x78, 0x79, 0x7a, 0xc0, 0xc3, 0x00 }, "Deserializing object of the `String` type for 'argument' failed." } // return type mismatch
};
[Theory]
[MemberData(nameof(InvalidPayloads))]
public void ParserThrowsForInvalidMessages(byte[] payload, string expectedExceptionMessage)
{
var binder = new TestBinder(new[] { typeof(string) }, typeof(string));
var exception = Assert.Throws<FormatException>(() =>
_hubProtocol.TryParseMessages(new ReadOnlySpan<byte>(payload), binder, out var messages));
Assert.Equal(expectedExceptionMessage, exception.Message);
}
}
}

View File

@ -0,0 +1,59 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
{
public class TestBinder : IInvocationBinder
{
private readonly Type[] _paramTypes;
private readonly Type _returnType;
public TestBinder(HubMessage expectedMessage)
{
switch (expectedMessage)
{
case InvocationMessage i:
_paramTypes = i.Arguments?.Select(a => a?.GetType() ?? typeof(object))?.ToArray();
break;
case StreamItemMessage s:
_returnType = s.Item?.GetType() ?? typeof(object);
break;
case CompletionMessage c:
_returnType = c.Result?.GetType() ?? typeof(object);
break;
}
}
public TestBinder() : this(null, null) { }
public TestBinder(Type[] paramTypes) : this(paramTypes, null) { }
public TestBinder(Type returnType) : this(null, returnType) { }
public TestBinder(Type[] paramTypes, Type returnType)
{
_paramTypes = paramTypes;
_returnType = returnType;
}
public Type[] GetParameterTypes(string methodName)
{
if (_paramTypes != null)
{
return _paramTypes;
}
throw new InvalidOperationException("Unexpected binder call");
}
public Type GetReturnType(string invocationId)
{
if (_returnType != null)
{
return _returnType;
}
throw new InvalidOperationException("Unexpected binder call");
}
}
}

View File

@ -0,0 +1,107 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
{
public class TestHubMessageEqualityComparer : IEqualityComparer<HubMessage>
{
public static readonly TestHubMessageEqualityComparer Instance = new TestHubMessageEqualityComparer();
private TestHubMessageEqualityComparer() { }
public bool Equals(HubMessage x, HubMessage y)
{
if (!string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal))
{
return false;
}
return InvocationMessagesEqual(x, y) || StreamItemMessagesEqual(x, y) || CompletionMessagesEqual(x, y);
}
private bool CompletionMessagesEqual(HubMessage x, HubMessage y)
{
return x is CompletionMessage left && y is CompletionMessage right &&
string.Equals(left.Error, right.Error, StringComparison.Ordinal) &&
left.HasResult == right.HasResult &&
(Equals(left.Result, right.Result) || SequenceEqual(left.Result, right.Result));
}
private bool StreamItemMessagesEqual(HubMessage x, HubMessage y)
{
return x is StreamItemMessage left && y is StreamItemMessage right &&
(Equals(left.Item, right.Item) || SequenceEqual(left.Item, right.Item));
}
private bool InvocationMessagesEqual(HubMessage x, HubMessage y)
{
return x is InvocationMessage left && y is InvocationMessage right &&
string.Equals(left.Target, right.Target, StringComparison.Ordinal) &&
ArgumentListsEqual(left.Arguments, right.Arguments) &&
left.NonBlocking == right.NonBlocking;
}
private bool ArgumentListsEqual(object[] left, object[] right)
{
if (left == right)
{
return true;
}
if (left == null || right == null || left.Length != right.Length)
{
return false;
}
for (var i = 0; i < left.Length; i++)
{
if (!(Equals(left[i], right[i]) || SequenceEqual(left[i], right[i])))
{
return false;
}
}
return true;
}
private bool SequenceEqual(object left, object right)
{
if (left == null && right == null)
{
return true;
}
var leftEnumerable = left as IEnumerable;
var rightEnumerable = right as IEnumerable;
if (leftEnumerable == null || rightEnumerable == null)
{
return false;
}
var leftEnumerator = leftEnumerable.GetEnumerator();
var rightEnumerator = rightEnumerable.GetEnumerator();
var leftMoved = leftEnumerator.MoveNext();
var rightMoved = rightEnumerator.MoveNext();
for (; leftMoved && rightMoved; leftMoved = leftEnumerator.MoveNext(), rightMoved = rightEnumerator.MoveNext())
{
if (!Equals(leftEnumerator.Current, rightEnumerator.Current))
{
return false;
}
}
return !leftMoved && !rightMoved;
}
public int GetHashCode(HubMessage obj)
{
// We never use these in a hash-table
return 0;
}
}
}