Fixup handshake logic and layering (#7470)

This commit is contained in:
BrennanConroy 2019-02-12 17:06:56 -08:00 committed by GitHub
parent 8f72b87981
commit 1c6651a1ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 385 additions and 363 deletions

View File

@ -9,13 +9,6 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "signalrclienttests", "test\
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "signalrclientdll", "src\signalrclientdll\Build\VS\signalrclientdll.vcxproj", "{18377AE8-E372-40CE-94FD-7F65008D39A3}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{063421D3-4E32-4BE5-874A-2E784B450858}"
ProjectSection(SolutionItems) = preProject
.nuget\NuGet.Config = .nuget\NuGet.Config
.nuget\NuGet.exe = .nuget\NuGet.exe
.nuget\NuGet.targets = .nuget\NuGet.targets
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{AABF08B1-12A4-4D06-A188-F01FBF8A9658}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "signalrclient-e2e-tests", "test\signalrclient-e2e-tests\Build\VS\signalrclient-e2e-tests.vcxproj", "{6006C96A-29F0-4B18-8DDD-764DC3419E2F}"
@ -23,6 +16,9 @@ EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "signalrclient-testhost", "test\signalrclient-testhost\signalrclient-testhost.csproj", "{11848039-1F13-4047-9539-8F9F45930788}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "HubConnectionSample", "samples\HubConnectionSample\HubConnectionSample.vcxproj", "{3C9BD092-18E6-4C6E-A887-CDFC80ACB206}"
ProjectSection(ProjectDependencies) = postProject
{18377AE8-E372-40CE-94FD-7F65008D39A3} = {18377AE8-E372-40CE-94FD-7F65008D39A3}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gtest", "test\gtest\gtest.vcxproj", "{CAC1267B-8778-4257-AAC6-CAF481723B01}"
EndProject

View File

@ -28,7 +28,7 @@ namespace signalr
void connection::set_message_received(const message_received_handler& message_received_callback)
{
m_pImpl->set_message_received_string(message_received_callback);
m_pImpl->set_message_received(message_received_callback);
}
void connection::set_disconnected(const std::function<void()>& disconnected_callback)

View File

@ -39,7 +39,7 @@ namespace signalr
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory)
: m_base_url(url), m_query_string(query_string), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level),
m_transport(nullptr), m_web_request_factory(std::move(web_request_factory)), m_transport_factory(std::move(transport_factory)),
m_message_received([](const web::json::value&){}), m_disconnected([](){}), m_handshakeReceived(false)
m_message_received([](const utility::string_t&){}), m_disconnected([](){})
{ }
connection_impl::~connection_impl()
@ -130,7 +130,7 @@ namespace signalr
if (task_canceled_exception)
{
connection->m_logger.log(trace_level::info,
_XPLATSTR("starting the connection has been cancelled."));
_XPLATSTR("starting the connection has been canceled."));
}
else
{
@ -160,7 +160,7 @@ namespace signalr
auto& logger = m_logger;
auto process_response_callback =
[weak_connection, connect_request_tce, disconnect_cts, logger](const utility::string_t& response) mutable
[weak_connection, disconnect_cts, logger](const utility::string_t& response) mutable
{
// When a connection is stopped we don't wait for its transport to stop. As a result if the same connection
// is immediately re-started the old transport can still invoke this callback. To prevent this we capture
@ -177,7 +177,7 @@ namespace signalr
auto connection = weak_connection.lock();
if (connection)
{
connection->process_response(response, connect_request_tce);
connection->process_response(response);
}
};
@ -240,7 +240,7 @@ namespace signalr
try
{
connect_task.get();
transport->send(_XPLATSTR("{\"protocol\":\"json\",\"version\":1}\x1e")).get();
connect_request_tce.set();
}
catch (const std::exception& e)
{
@ -256,109 +256,15 @@ namespace signalr
return pplx::create_task(connect_request_tce);
}
enum MessageType
{
Invocation = 1,
StreamItem,
Completion,
StreamInvocation,
CancelInvocation,
Ping,
Close,
};
void connection_impl::process_response(const utility::string_t& response, const pplx::task_completion_event<void>& connect_request_tce)
void connection_impl::process_response(const utility::string_t& response)
{
m_logger.log(trace_level::messages,
utility::string_t(_XPLATSTR("processing message: ")).append(response));
try
{
auto pos = response.find('\x1e');
std::size_t lastPos = 0;
while (pos != utility::string_t::npos)
{
auto message = response.substr(lastPos, pos - lastPos);
const auto result = web::json::value::parse(message);
if (!result.is_object())
{
m_logger.log(trace_level::info, utility::string_t(_XPLATSTR("unexpected response received from the server: "))
.append(message));
return;
}
if (!m_handshakeReceived)
{
if (result.has_field(_XPLATSTR("error")))
{
auto error = result.at(_XPLATSTR("error")).as_string();
m_logger.log(trace_level::errors, utility::string_t(_XPLATSTR("handshake error: "))
.append(error));
connect_request_tce.set_exception(signalr_exception(utility::string_t(_XPLATSTR("Received an error during handshake: ")).append(error)));
return;
}
else
{
if (result.size() != 0)
{
connect_request_tce.set_exception(signalr_exception(utility::string_t(_XPLATSTR("Received unexpected message while waiting for the handshake response."))));
}
m_handshakeReceived = true;
connect_request_tce.set();
return;
}
}
auto messageType = result.at(_XPLATSTR("type"));
switch (messageType.as_integer())
{
case MessageType::Invocation:
{
invoke_message_received(result);
break;
}
case MessageType::StreamInvocation:
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'StreamInvocation'.");
case MessageType::StreamItem:
// TODO
break;
case MessageType::Completion:
{
if (result.has_field(_XPLATSTR("error")) && result.has_field(_XPLATSTR("result")))
{
// TODO: error
}
invoke_message_received(result);
break;
}
case MessageType::CancelInvocation:
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'CancelInvocation'.");
case MessageType::Ping:
// TODO
break;
case MessageType::Close:
// TODO
break;
}
lastPos = pos + 1;
pos = response.find('\x1e', lastPos);
}
}
catch (const std::exception &e)
{
m_logger.log(trace_level::errors, utility::string_t(_XPLATSTR("error occured when parsing response: "))
.append(utility::conversions::to_string_t(e.what()))
.append(_XPLATSTR(". response: "))
.append(response));
}
invoke_message_received(response);
}
void connection_impl::invoke_message_received(const web::json::value& message)
void connection_impl::invoke_message_received(const utility::string_t& message)
{
try
{
@ -458,7 +364,6 @@ namespace signalr
// This function is called from the dtor so you must not use `shared_from_this` here (it will throw).
pplx::task<void> connection_impl::shutdown()
{
m_handshakeReceived = false;
{
std::lock_guard<std::mutex> lock(m_stop_lock);
m_logger.log(trace_level::info, _XPLATSTR("acquired lock in shutdown()"));
@ -518,15 +423,7 @@ namespace signalr
return m_connection_id;
}
void connection_impl::set_message_received_string(const std::function<void(const utility::string_t&)>& message_received)
{
set_message_received_json([message_received](const web::json::value& payload)
{
message_received(payload.is_string() ? payload.as_string() : payload.serialize());
});
}
void connection_impl::set_message_received_json(const std::function<void(const web::json::value&)>& message_received)
void connection_impl::set_message_received(const std::function<void(const utility::string_t&)>& message_received)
{
ensure_disconnected(_XPLATSTR("cannot set the callback when the connection is not in the disconnected state. "));
m_message_received = message_received;

View File

@ -44,8 +44,7 @@ namespace signalr
connection_state get_connection_state() const;
utility::string_t get_connection_id() const;
void set_message_received_string(const std::function<void(const utility::string_t&)>& message_received);
void set_message_received_json(const std::function<void(const web::json::value&)>& message_received);
void set_message_received(const std::function<void(const utility::string_t&)>& message_received);
void set_disconnected(const std::function<void()>& disconnected);
void set_client_config(const signalr_client_config& config);
@ -60,7 +59,7 @@ namespace signalr
std::unique_ptr<web_request_factory> m_web_request_factory;
std::unique_ptr<transport_factory> m_transport_factory;
std::function<void(const web::json::value&)> m_message_received;
std::function<void(const utility::string_t&)> m_message_received;
std::function<void()> m_disconnected;
signalr_client_config m_signalr_client_config;
@ -71,7 +70,6 @@ namespace signalr
utility::string_t m_connection_data;
utility::string_t m_message_id;
utility::string_t m_groups_token;
bool m_handshakeReceived;
connection_impl(const utility::string_t& url, const utility::string_t& query_string, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory);
@ -80,14 +78,14 @@ namespace signalr
pplx::task<void> send_connect_request(const std::shared_ptr<transport>& transport,
const pplx::task_completion_event<void>& connect_request_tce);
void process_response(const utility::string_t& response, const pplx::task_completion_event<void>& connect_request_tce);
void process_response(const utility::string_t& response);
pplx::task<void> shutdown();
bool change_state(connection_state old_state, connection_state new_state);
connection_state change_state(connection_state new_state);
void handle_connection_state_change(connection_state old_state, connection_state new_state);
void invoke_message_received(const web::json::value& message);
void invoke_message_received(const utility::string_t& message);
static utility::string_t translate_connection_state(connection_state state);
void ensure_disconnected(const utility::string_t& error_message);

View File

@ -44,7 +44,8 @@ namespace signalr
std::unique_ptr<transport_factory> transport_factory)
: m_connection(connection_impl::create(url, query_string, trace_level, log_writer,
std::move(web_request_factory), std::move(transport_factory))),m_logger(log_writer, trace_level),
m_callback_manager(json::value::parse(_XPLATSTR("{ \"error\" : \"connection went out of scope before invocation result was received\"}")))
m_callback_manager(json::value::parse(_XPLATSTR("{ \"error\" : \"connection went out of scope before invocation result was received\"}"))),
m_disconnected([]() {})
{ }
void hub_connection_impl::initialize()
@ -54,7 +55,7 @@ namespace signalr
// weak_ptr prevents a circular dependency leading to memory leak and other problems
auto weak_hub_connection = std::weak_ptr<hub_connection_impl>(this_hub_connection);
m_connection->set_message_received_json([weak_hub_connection](const web::json::value& message)
m_connection->set_message_received([weak_hub_connection](const utility::string_t& message)
{
auto connection = weak_hub_connection.lock();
if (connection)
@ -62,6 +63,16 @@ namespace signalr
connection->process_message(message);
}
});
m_connection->set_disconnected([weak_hub_connection]()
{
auto connection = weak_hub_connection.lock();
if (connection)
{
connection->m_handshakeTask.set_exception(signalr_exception(_XPLATSTR("connection closed while handshake was in progress.")));
connection->m_disconnected();
}
});
}
void hub_connection_impl::on(const utility::string_t& event_name, const std::function<void(const json::value &)>& handler)
@ -89,7 +100,49 @@ namespace signalr
pplx::task<void> hub_connection_impl::start()
{
return m_connection->start();
if (m_connection->get_connection_state() != connection_state::disconnected)
{
throw signalr_exception(
_XPLATSTR("the connection can only be started if it is in the disconnected state"));
}
m_handshakeTask = pplx::task_completion_event<void>();
m_handshakeReceived = false;
auto weak_connection = m_connection->weak_from_this();
return m_connection->start()
.then([weak_connection, this](pplx::task<void> startTask)
{
startTask.get();
auto connection = weak_connection.lock();
if (!connection)
{
// The connection has been destructed
return pplx::task_from_exception<void>(signalr_exception(_XPLATSTR("the hub connection has been deconstructed")));
}
return connection->send(_XPLATSTR("{\"protocol\":\"json\",\"version\":1}\x1e"))
.then([this](pplx::task<void> previous_task)
{
previous_task.get();
return pplx::task<void>(m_handshakeTask);
})
.then([weak_connection](pplx::task<void> previous_task)
{
try
{
previous_task.get();
return previous_task;
}
catch (std::exception)
{
auto connection = weak_connection.lock();
if (connection)
{
connection->stop();
}
throw;
}
});
});
}
pplx::task<void> hub_connection_impl::stop()
@ -98,27 +151,108 @@ namespace signalr
return m_connection->stop();
}
void hub_connection_impl::process_message(const web::json::value& message)
enum MessageType
{
auto type = message.at(_XPLATSTR("type")).as_integer();
if (type == 3)
{
invoke_callback(message);
return;
}
else if (type == 1)
{
auto method = message.at(_XPLATSTR("target")).as_string();
auto event = m_subscriptions.find(method);
if (event != m_subscriptions.end())
{
event->second(message.at(_XPLATSTR("arguments")));
}
return;
}
Invocation = 1,
StreamItem,
Completion,
StreamInvocation,
CancelInvocation,
Ping,
Close,
};
m_logger.log(trace_level::info, utility::string_t(_XPLATSTR("non-hub message received and will be discarded. message: "))
.append(message.serialize()));
void hub_connection_impl::process_message(const utility::string_t& response)
{
try
{
auto pos = response.find('\x1e');
std::size_t lastPos = 0;
while (pos != utility::string_t::npos)
{
auto message = response.substr(lastPos, pos - lastPos);
const auto result = web::json::value::parse(message);
if (!result.is_object())
{
m_logger.log(trace_level::info, utility::string_t(_XPLATSTR("unexpected response received from the server: "))
.append(message));
return;
}
if (!m_handshakeReceived)
{
if (result.has_field(_XPLATSTR("error")))
{
auto error = result.at(_XPLATSTR("error")).as_string();
m_logger.log(trace_level::errors, utility::string_t(_XPLATSTR("handshake error: "))
.append(error));
m_handshakeTask.set_exception(signalr_exception(utility::string_t(_XPLATSTR("Received an error during handshake: ")).append(error)));
return;
}
else
{
if (result.has_field(_XPLATSTR("type")))
{
m_handshakeTask.set_exception(signalr_exception(utility::string_t(_XPLATSTR("Received unexpected message while waiting for the handshake response."))));
}
m_handshakeReceived = true;
m_handshakeTask.set();
return;
}
}
auto messageType = result.at(_XPLATSTR("type"));
switch (messageType.as_integer())
{
case MessageType::Invocation:
{
auto method = result.at(_XPLATSTR("target")).as_string();
auto event = m_subscriptions.find(method);
if (event != m_subscriptions.end())
{
event->second(result.at(_XPLATSTR("arguments")));
}
break;
}
case MessageType::StreamInvocation:
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'StreamInvocation'.");
case MessageType::StreamItem:
// TODO
break;
case MessageType::Completion:
{
if (result.has_field(_XPLATSTR("error")) && result.has_field(_XPLATSTR("result")))
{
// TODO: error
}
invoke_callback(result);
break;
}
case MessageType::CancelInvocation:
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'CancelInvocation'.");
case MessageType::Ping:
// TODO
break;
case MessageType::Close:
// TODO
break;
}
lastPos = pos + 1;
pos = response.find('\x1e', lastPos);
}
}
catch (const std::exception &e)
{
m_logger.log(trace_level::errors, utility::string_t(_XPLATSTR("error occured when parsing response: "))
.append(utility::conversions::to_string_t(e.what()))
.append(_XPLATSTR(". response: "))
.append(response));
}
}
bool hub_connection_impl::invoke_callback(const web::json::value& message)
@ -144,7 +278,7 @@ namespace signalr
[tce](const std::exception_ptr e) { tce.set_exception(e); }));
invoke_hub_method(method_name, arguments, callback_id, nullptr,
[tce](const std::exception_ptr e){tce.set_exception(e); });
[tce](const std::exception_ptr e){ tce.set_exception(e); });
return pplx::create_task(tce);
}
@ -220,7 +354,7 @@ namespace signalr
void hub_connection_impl::set_disconnected(const std::function<void()>& disconnected)
{
m_connection->set_disconnected(disconnected);
m_disconnected = disconnected;
}
// unnamed namespace makes it invisble outside this translation unit

View File

@ -54,10 +54,13 @@ namespace signalr
logger m_logger;
callback_manager m_callback_manager;
std::unordered_map<utility::string_t, std::function<void(const json::value &)>, case_insensitive_hash, case_insensitive_equals> m_subscriptions;
bool m_handshakeReceived;
pplx::task_completion_event<void> m_handshakeTask;
std::function<void()> m_disconnected;
void initialize();
void process_message(const web::json::value& message);
void process_message(const utility::string_t& message);
void invoke_hub_method(const utility::string_t& method_name, const json::value& arguments, const utility::string_t& callback_id,
std::function<void()> set_completion, std::function<void(const std::exception_ptr)> set_exception);

View File

@ -191,63 +191,65 @@ TEST(connection_impl_start, start_fails_if_transport_connect_throws)
}
// TODO
//TEST(connection_impl_start, start_fails_if_no_available_transports)
//{
// auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri &) -> std::unique_ptr<web_request>
// {
// auto response_body =
// _XPLATSTR("{ \"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
// _XPLATSTR("\"availableTransports\" : [] }");
//
// return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
// });
//
// auto websocket_client = std::make_shared<test_websocket_client>();
// auto connection =
// connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::errors, std::make_shared<trace_log_writer>(),
// std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
//
// try
// {
// connection->start().get();
// ASSERT_TRUE(false); // exception not thrown
// }
// catch (const std::exception &e)
// {
// ASSERT_EQ(_XPLATSTR("websockets not supported on the server and there is no fallback transport"),
// utility::conversions::to_string_t(e.what()));
// }
//}
TEST(connection_impl_start, DISABLED_start_fails_if_no_available_transports)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri &) -> std::unique_ptr<web_request>
{
auto response_body =
_XPLATSTR("{ \"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"availableTransports\" : [] }");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
auto websocket_client = std::make_shared<test_websocket_client>();
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::errors, std::make_shared<trace_log_writer>(),
std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
try
{
connection->start().get();
ASSERT_TRUE(false); // exception not thrown
}
catch (const std::exception &e)
{
ASSERT_EQ(_XPLATSTR("websockets not supported on the server and there is no fallback transport"),
utility::conversions::to_string_t(e.what()));
}
}
#if defined(_WIN32) // https://github.com/aspnet/SignalR-Client-Cpp/issues/131
TEST(connection_impl_start, start_fails_if_transport_fails_when_receiving_messages)
TEST(connection_impl_send, send_fails_if_transport_fails_when_receiving_messages)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto websocket_client = create_test_websocket_client(
/* receive function */ []()
auto websocket_client = create_test_websocket_client([]() { return pplx::task_from_result(std::string("")); },
/* send function */ [](const utility::string_t &)
{
return pplx::task_from_exception<std::string>(std::runtime_error("receive error"));
return pplx::task_from_exception<void>(std::runtime_error("send error"));
});
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->start().get();
try
{
connection->start().get();
connection->send(_XPLATSTR("message")).get();
ASSERT_TRUE(false); // exception not thrown
}
catch (const std::exception &e)
{
ASSERT_EQ(_XPLATSTR("receive error"), utility::conversions::to_string_t(e.what()));
ASSERT_EQ(_XPLATSTR("send error"), utility::conversions::to_string_t(e.what()));
}
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_TRUE(log_entries.size() > 1) << dump_vector(log_entries);
ASSERT_TRUE(log_entries.size() == 1) << dump_vector(log_entries);
auto entry = remove_date_from_log_entry(log_entries[1]);
ASSERT_EQ(_XPLATSTR("[error ] connection could not be started due to: receive error\n"), entry) << dump_vector(log_entries);
auto entry = remove_date_from_log_entry(log_entries[0]);
ASSERT_EQ(_XPLATSTR("[error ] error sending data: send error\n"), entry) << dump_vector(log_entries);
}
#endif
@ -297,10 +299,11 @@ TEST(connection_impl_start, start_fails_if_connect_request_times_out)
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
pplx::task_completion_event<void> tce;
auto websocket_client = std::make_shared<test_websocket_client>();
websocket_client->set_receive_function([]()->pplx::task<std::string>
websocket_client->set_connect_function([tce](const web::uri&) mutable
{
return pplx::task_from_result(std::string("{}"));
return pplx::task<void>(tce);
});
auto connection =
@ -321,17 +324,24 @@ TEST(connection_impl_start, start_fails_if_connect_request_times_out)
TEST(connection_impl_process_response, process_response_logs_messages)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto wait_receive = std::make_shared<event>();
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
/* receive function */ [wait_receive]()
{
wait_receive->set();
return pplx::task_from_result(std::string("{ }"));
});
auto connection = create_connection(websocket_client, writer, trace_level::messages);
connection->start().get();
// Need to give the receive loop time to run
std::make_shared<event>()->wait(1000);
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_FALSE(log_entries.empty());
auto entry = remove_date_from_log_entry(log_entries[0]);
ASSERT_EQ(_XPLATSTR("[message ] processing message: { }\x1e\n"), entry);
ASSERT_EQ(_XPLATSTR("[message ] processing message: { }\n"), entry);
}
TEST(connection_impl_send, message_sent)
@ -378,30 +388,14 @@ TEST(connection_impl_send, send_throws_if_connection_not_connected)
TEST(connection_impl_send, exceptions_from_send_logged_and_propagated)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
int call_number = -1;
bool hasSentHandshake = false;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]()
mutable {
std::string responses[]
{
"{ }\x1e",
"{}"
};
call_number = std::min(call_number + 1, 1);
return pplx::task_from_result(responses[call_number]);
},
/* send function */ [&hasSentHandshake](const utility::string_t&)
/* receive function */ []()
{
if (hasSentHandshake)
{
return pplx::task_from_exception<void>(std::runtime_error("error"));
}
hasSentHandshake = true;
return pplx::task_from_result();
return pplx::task_from_result(std::string("{}"));
},
/* send function */ [](const utility::string_t&)
{
return pplx::task_from_exception<void>(std::runtime_error("error"));
});
auto connection = create_connection(websocket_client, writer, trace_level::errors);
@ -436,13 +430,12 @@ TEST(connection_impl_set_message_received, callback_invoked_when_message_receive
mutable {
std::string responses[]
{
"{ }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"Test\"] }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"release\"] }\x1e",
"Test",
"release",
"{}"
};
call_number = std::min(call_number + 1, 3);
call_number = std::min(call_number + 1, 2);
return pplx::task_from_result(responses[call_number]);
});
@ -452,15 +445,14 @@ TEST(connection_impl_set_message_received, callback_invoked_when_message_receive
auto message = std::make_shared<utility::string_t>();
auto message_received_event = std::make_shared<event>();
connection->set_message_received_string([message, message_received_event](const utility::string_t &m)
connection->set_message_received([message, message_received_event](const utility::string_t &m)
{
auto value = web::json::value::parse(m).at(_XPLATSTR("arguments")).as_array()[0].as_string();
if (value == _XPLATSTR("Test"))
if (m == _XPLATSTR("Test"))
{
*message = value;
*message = m;
}
if (value == _XPLATSTR("release"))
if (m == _XPLATSTR("release"))
{
message_received_event->set();
}
@ -481,13 +473,12 @@ TEST(connection_impl_set_message_received, exception_from_callback_caught_and_lo
mutable {
std::string responses[]
{
"{ }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"throw\"] }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"release\"] }\x1e",
"throw",
"release",
"{}"
};
call_number = std::min(call_number + 1, 3);
call_number = std::min(call_number + 1, 2);
return pplx::task_from_result(responses[call_number]);
});
@ -496,15 +487,14 @@ TEST(connection_impl_set_message_received, exception_from_callback_caught_and_lo
auto connection = create_connection(websocket_client, writer, trace_level::errors);
auto message_received_event = std::make_shared<event>();
connection->set_message_received_string([message_received_event](const utility::string_t &m)
connection->set_message_received([message_received_event](const utility::string_t &m)
{
auto value = web::json::value::parse(m).at(_XPLATSTR("arguments")).as_array()[0].as_string();
if (value == _XPLATSTR("throw"))
if (m == _XPLATSTR("throw"))
{
throw std::runtime_error("oops");
}
if (value == _XPLATSTR("release"))
if (m == _XPLATSTR("release"))
{
message_received_event->set();
}
@ -529,13 +519,12 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh
mutable {
std::string responses[]
{
"{ }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"throw\"] }\x1e",
"{ \"type\": 1, \"target\": \"something\", \"arguments\" : [\"release\"] }\x1e",
"throw",
"release",
"{}"
};
call_number = std::min(call_number + 1, 3);
call_number = std::min(call_number + 1, 2);
return pplx::task_from_result(responses[call_number]);
});
@ -544,15 +533,14 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh
auto connection = create_connection(websocket_client, writer, trace_level::errors);
auto message_received_event = std::make_shared<event>();
connection->set_message_received_string([message_received_event](const utility::string_t &m)
connection->set_message_received([message_received_event](const utility::string_t &m)
{
auto value = web::json::value::parse(m).at(_XPLATSTR("arguments")).as_array()[0].as_string();
if (value == _XPLATSTR("throw"))
if (m == _XPLATSTR("throw"))
{
throw 42;
}
if (value == _XPLATSTR("release"))
if (m == _XPLATSTR("release"))
{
message_received_event->set();
}
@ -569,7 +557,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh
ASSERT_EQ(_XPLATSTR("[error ] message_received callback threw an unknown exception\n"), entry);
}
TEST(connection_impl_set_message_received, error_logged_for_malformed_payload)
TEST(connection_impl_set_message_received, DISABLED_error_logged_for_malformed_payload)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
@ -592,7 +580,7 @@ TEST(connection_impl_set_message_received, error_logged_for_malformed_payload)
auto connection = create_connection(websocket_client, writer, trace_level::errors);
auto message_received_event = std::make_shared<event>();
connection->set_message_received_string([message_received_event](const utility::string_t&)
connection->set_message_received([message_received_event](const utility::string_t&)
{
// this is called only once because we have just one response with a message
message_received_event->set();
@ -609,7 +597,7 @@ TEST(connection_impl_set_message_received, error_logged_for_malformed_payload)
ASSERT_EQ(_XPLATSTR("[error ] error occured when parsing response: * Line 1, Column 4 Syntax error: Malformed object literal. response: { 42\x1e\n"), entry);
}
TEST(connection_impl_set_message_received, unexpected_responses_logged)
TEST(connection_impl_set_message_received, DISABLED_unexpected_responses_logged)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
@ -632,7 +620,7 @@ TEST(connection_impl_set_message_received, unexpected_responses_logged)
auto connection = create_connection(websocket_client, writer, trace_level::info);
auto message_received_event = std::make_shared<event>();
connection->set_message_received_string([message_received_event](const utility::string_t&)
connection->set_message_received([message_received_event](const utility::string_t&)
{
// this is called only once because we have just one response with a message
message_received_event->set();
@ -668,17 +656,10 @@ void can_be_set_only_in_disconnected_state(std::function<void(connection_impl *)
}
}
TEST(connection_impl_set_configuration, set_message_received_string_callback_can_be_set_only_in_disconnected_state)
TEST(connection_impl_set_configuration, set_message_received_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_message_received_string([](const utility::string_t&){}); },
"cannot set the callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_message_received_json_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_message_received_json([](const web::json::value&){}); },
[](connection_impl* connection) { connection->set_message_received([](const utility::string_t&){}); },
"cannot set the callback when the connection is not in the disconnected state. current connection state: connected");
}
@ -768,8 +749,7 @@ TEST(connection_impl_stop, can_start_and_stop_connection)
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[3]));
}
// Flaky test: "transport timed out when trying to connect"
TEST(connection_impl_stop, DISABLED_can_start_and_stop_connection_multiple_times)
TEST(connection_impl_stop, can_start_and_stop_connection_multiple_times)
{
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
@ -881,7 +861,7 @@ TEST(connection_impl_stop, stop_cancels_ongoing_start_request)
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[info ] stopping connection\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[info ] acquired lock in shutdown()\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[info ] starting the connection has been cancelled.\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[info ] starting the connection has been canceled.\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}
@ -926,7 +906,7 @@ TEST(connection_impl_stop, ongoing_start_request_canceled_if_connection_stopped_
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[info ] stopping connection\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[info ] acquired lock in shutdown()\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[info ] starting the connection has been cancelled.\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[info ] starting the connection has been canceled.\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}

View File

@ -58,6 +58,92 @@ TEST(start, start_starts_connection)
ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
}
TEST(start, start_sends_handshake)
{
auto message = std::make_shared<utility::string_t>();
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */ [message](const utility::string_t& msg) { *message = msg; return pplx::task_from_result(); });
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
ASSERT_EQ(_XPLATSTR("{\"protocol\":\"json\",\"version\":1}\x1e"), *message);
ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
}
TEST(start, start_waits_for_handshake_response)
{
pplx::task_completion_event<void> tce;
pplx::task_completion_event<void> tceWaitForSend;
auto websocket_client = create_test_websocket_client(
/* receive function */ [tce, tceWaitForSend]()
{
tceWaitForSend.set();
pplx::task<void>(tce).get();
return pplx::task_from_result(std::string("{ }\x1e"));
});
auto hub_connection = create_hub_connection(websocket_client);
auto startTask = hub_connection->start();
pplx::task<void>(tceWaitForSend).get();
ASSERT_FALSE(startTask.is_done());
tce.set();
startTask.get();
ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
}
TEST(start, start_fails_for_handshake_response_with_error)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"error\":\"bad things\"}\x1e")); });
auto hub_connection = create_hub_connection(websocket_client);
try
{
hub_connection->start().get();
ASSERT_TRUE(false);
}
catch (std::exception ex)
{
ASSERT_STREQ("Received an error during handshake: bad things", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection->get_connection_state());
}
TEST(start, start_fails_if_stop_called_before_handshake_response)
{
pplx::task_completion_event<std::string> tce;
pplx::task_completion_event<void> tceWaitForSend;
auto websocket_client = create_test_websocket_client(
/* receive function */ [tce]() { return pplx::task<std::string>(tce); },
/* send function */ [tceWaitForSend](const utility::string_t &)
{
tceWaitForSend.set();
return pplx::task_from_result();
});
auto hub_connection = create_hub_connection(websocket_client);
auto startTask = hub_connection->start();
pplx::task<void>(tceWaitForSend).get();
hub_connection->stop();
try
{
startTask.get();
ASSERT_TRUE(false);
}
catch (std::exception ex)
{
ASSERT_STREQ("connection closed while handshake was in progress.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection->get_connection_state());
}
TEST(stop, stop_stops_connection)
{
auto websocket_client = create_test_websocket_client(
@ -85,8 +171,7 @@ TEST(stop, disconnected_callback_called_when_hub_connection_stops)
ASSERT_TRUE(disconnected_invoked);
}
// Flaky test: ASSERT_EQ(4U, log_entries.size()) was 3
TEST(stop, DISABLED_connection_stopped_when_going_out_of_scope)
TEST(stop, connection_stopped_when_going_out_of_scope)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
@ -109,7 +194,7 @@ TEST(stop, DISABLED_connection_stopped_when_going_out_of_scope)
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(4U, log_entries.size());
ASSERT_EQ(4U, log_entries.size()) << dump_vector(log_entries);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> disconnecting\n"), remove_date_from_log_entry(log_entries[2]));
@ -224,17 +309,22 @@ TEST(hub_invocation, hub_connection_invokes_users_code_on_hub_invocations)
ASSERT_EQ(_XPLATSTR("[\"message\",1]"), *payload);
}
// Flaky test: "error" thrown in test body
TEST(invoke, DISABLED_invoke_creates_correct_payload)
TEST(invoke, invoke_creates_correct_payload)
{
utility::string_t payload;
bool handshakeReceived = false;
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */[&payload](const utility::string_t& m)
/* send function */[&payload, &handshakeReceived](const utility::string_t& m)
{
payload = m;
return pplx::task_from_exception<void>(std::runtime_error("error"));
if (handshakeReceived)
{
payload = m;
return pplx::task_from_exception<void>(std::runtime_error("error"));
}
handshakeReceived = true;
return pplx::task_from_result();
});
auto hub_connection = create_hub_connection(websocket_client);
@ -252,12 +342,20 @@ TEST(invoke, DISABLED_invoke_creates_correct_payload)
ASSERT_EQ(_XPLATSTR("{\"arguments\":[],\"target\":\"method\",\"type\":1}\x1e"), payload);
}
// "error" thrown in test body
TEST(invoke, DISABLED_callback_not_called_if_send_throws)
TEST(invoke, callback_not_called_if_send_throws)
{
bool handshakeReceived = false;
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */[](const utility::string_t&) { return pplx::task_from_exception<void>(std::runtime_error("error")); });
/* send function */[handshakeReceived](const utility::string_t&) mutable
{
if (handshakeReceived)
{
return pplx::task_from_exception<void>(std::runtime_error("error"));
}
handshakeReceived = true;
return pplx::task_from_result();
});
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
@ -437,14 +535,14 @@ TEST(invoke_void, invoke_unblocks_task_when_server_completes_call)
ASSERT_TRUE(true);
}
// Flaky test: crashes test process
TEST(invoke_void, DISABLED_invoke_logs_if_callback_for_given_id_not_found)
TEST(invoke_void, invoke_logs_if_callback_for_given_id_not_found)
{
auto message_received_event = std::make_shared<event>();
auto handshake_sent = std::make_shared<event>();
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, message_received_event]()
/* receive function */ [call_number, message_received_event, handshake_sent]()
mutable {
std::string responses[]
{
@ -453,6 +551,8 @@ TEST(invoke_void, DISABLED_invoke_logs_if_callback_for_given_id_not_found)
"{}"
};
handshake_sent->wait(1000);
call_number = std::min(call_number + 1, 2);
if (call_number > 1)
@ -461,6 +561,11 @@ TEST(invoke_void, DISABLED_invoke_logs_if_callback_for_given_id_not_found)
}
return pplx::task_from_result(responses[call_number]);
},
[handshake_sent](const utility::string_t&)
{
handshake_sent->set();
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
@ -472,8 +577,8 @@ TEST(invoke_void, DISABLED_invoke_logs_if_callback_for_given_id_not_found)
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_TRUE(log_entries.size() > 1);
auto entry = remove_date_from_log_entry(log_entries[1]);
ASSERT_EQ(_XPLATSTR("[info ] no callback found for id: 0\n"), entry);
auto entry = remove_date_from_log_entry(log_entries[2]);
ASSERT_EQ(_XPLATSTR("[info ] no callback found for id: 0\n"), entry) << dump_vector(log_entries);
}
// TODO Flaky until hub_connection.start waits for handshake response
@ -566,97 +671,6 @@ TEST(invoke_void, DISABLED_invoke_propagates_hub_errors_from_server_as_hub_excep
}
}
// TODO Flaky until hub_connection.start waits for handshake response
TEST(invoke_void, DISABLED_invoke_creates_hub_exception_even_if_no_error_data)
{
auto callback_registered_event = std::make_shared<event>();
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, callback_registered_event]()
mutable {
std::string responses[]
{
"{ }\x1e",
"{ \"type\": 3, \"invocationId\": \"0\", \"error\": \"Ooops\" }\x1e"
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number > 0)
{
callback_registered_event->wait();
}
return pplx::task_from_result(responses[call_number]);
});
auto hub_connection = create_hub_connection(websocket_client);
try
{
hub_connection->start()
.then([hub_connection, callback_registered_event]()
{
auto t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
callback_registered_event->set();
return t;
}).get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const hub_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
}
}
// TODO Flaky until hub_connection.start waits for handshake response
TEST(invoke_void, DISABLED_invoke_creates_runtime_error_when_hub_exception_indicator_false)
{
auto callback_registered_event = std::make_shared<event>();
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, callback_registered_event]()
mutable {
std::string responses[]
{
"{ }\x1e",
"{ \"type\": 3, \"invocationId\": \"0\", \"error\": \"Ooops\" }\x1e"
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number > 0)
{
callback_registered_event->wait();
}
return pplx::task_from_result(responses[call_number]);
});
auto hub_connection = create_hub_connection(websocket_client);
try
{
hub_connection->start()
.then([hub_connection, callback_registered_event]()
{
auto t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
callback_registered_event->set();
return t;
}).get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const signalr_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
ASSERT_TRUE(dynamic_cast<const hub_exception *>(&e) == nullptr);
}
}
// TODO Flaky until hub_connection.start waits for handshake response
TEST(invoke_void, DISABLED_invoke_creates_runtime_error)
{

View File

@ -8,7 +8,7 @@
using namespace web;
using namespace signalr;
TEST(web_request_get_response, sends_request_receives_response)
TEST(web_request_get_response, DISABLED_sends_request_receives_response)
{
web::uri url(_XPLATSTR("http://localhost:56000/web_request_test"));
auto request_received = false;