Remove reconnect from C++ Client (#7094)

This commit is contained in:
BrennanConroy 2019-01-31 23:21:38 -08:00 committed by GitHub
parent aca9bffd23
commit 1a61a58c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 171 additions and 1616 deletions

View File

@ -35,8 +35,6 @@ namespace signalr
SIGNALRCLIENT_API pplx::task<void> __cdecl send(const utility::string_t& data);
SIGNALRCLIENT_API void __cdecl set_message_received(const message_received_handler& message_received_callback);
SIGNALRCLIENT_API void __cdecl set_reconnecting(const std::function<void __cdecl()>& reconnecting_callback);
SIGNALRCLIENT_API void __cdecl set_reconnected(const std::function<void __cdecl()>& reconnected_callback);
SIGNALRCLIENT_API void __cdecl set_disconnected(const std::function<void __cdecl()>& disconnected_callback);
SIGNALRCLIENT_API void __cdecl set_client_config(const signalr_client_config& config);
@ -53,4 +51,4 @@ namespace signalr
// a deleted object if the task is run after the `connection` instance goes away.
std::shared_ptr<connection_impl> m_pImpl;
};
}
}

View File

@ -9,8 +9,7 @@ namespace signalr
{
connecting,
connected,
reconnecting,
disconnecting,
disconnected
};
}
}

View File

@ -23,7 +23,7 @@ namespace signalr
typedef std::function<void __cdecl (const web::json::value&)> method_invoked_handler;
SIGNALRCLIENT_API explicit hub_connection(const utility::string_t& url, const utility::string_t& query_string = _XPLATSTR(""),
trace_level trace_level = trace_level::all, std::shared_ptr<log_writer> log_writer = nullptr, bool use_default_url = true);
trace_level trace_level = trace_level::all, std::shared_ptr<log_writer> log_writer = nullptr);
SIGNALRCLIENT_API ~hub_connection();
@ -37,8 +37,6 @@ namespace signalr
SIGNALRCLIENT_API connection_state __cdecl get_connection_state() const;
SIGNALRCLIENT_API utility::string_t __cdecl get_connection_id() const;
SIGNALRCLIENT_API void __cdecl set_reconnecting(const std::function<void __cdecl()>& reconnecting_callback);
SIGNALRCLIENT_API void __cdecl set_reconnected(const std::function<void __cdecl()>& reconnected_callback);
SIGNALRCLIENT_API void __cdecl set_disconnected(const std::function<void __cdecl()>& disconnected_callback);
SIGNALRCLIENT_API void __cdecl set_client_config(const signalr_client_config& config);
@ -61,4 +59,4 @@ namespace signalr
SIGNALRCLIENT_API pplx::task<web::json::value> __cdecl invoke_json(const utility::string_t& method_name, const web::json::value& arguments);
SIGNALRCLIENT_API pplx::task<void> __cdecl invoke_void(const utility::string_t& method_name, const web::json::value& arguments);
};
}
}

View File

@ -14,16 +14,8 @@ namespace signalr
class hub_exception : public signalr_exception
{
public:
hub_exception(const utility::string_t &what, const web::json::value& error_data)
: signalr_exception(what), m_error_data(error_data)
hub_exception(const utility::string_t &what)
: signalr_exception(what)
{}
web::json::value error_data() const
{
return m_error_data;
}
private:
web::json::value m_error_data;
};
}
}

View File

@ -31,16 +31,6 @@ namespace signalr
m_pImpl->set_message_received_string(message_received_callback);
}
void connection::set_reconnecting(const std::function<void()>& reconnecting_callback)
{
m_pImpl->set_reconnecting(reconnecting_callback);
}
void connection::set_reconnected(const std::function<void()>& reconnected_callback)
{
m_pImpl->set_reconnected(reconnected_callback);
}
void connection::set_disconnected(const std::function<void()>& disconnected_callback)
{
m_pImpl->set_disconnected(disconnected_callback);
@ -65,4 +55,4 @@ namespace signalr
{
return m_pImpl->get_connection_id();
}
}
}

View File

@ -37,10 +37,9 @@ namespace signalr
connection_impl::connection_impl(const utility::string_t& url, const utility::string_t& query_string, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory)
: m_base_url(url), m_query_string(query_string), m_connection_state(connection_state::disconnected), m_reconnect_delay(2000),
m_logger(log_writer, trace_level), m_transport(nullptr), m_web_request_factory(std::move(web_request_factory)),
m_transport_factory(std::move(transport_factory)), m_message_received([](const web::json::value&){}),
m_reconnecting([](){}), m_reconnected([](){}), m_disconnected([](){}), m_handshakeReceived(false)
: m_base_url(url), m_query_string(query_string), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level),
m_transport(nullptr), m_web_request_factory(std::move(web_request_factory)), m_transport_factory(std::move(transport_factory)),
m_message_received([](const web::json::value&){}), m_disconnected([](){}), m_handshakeReceived(false)
{ }
connection_impl::~connection_impl()
@ -50,11 +49,6 @@ namespace signalr
// Signaling the event is safe here. We are in the dtor so noone is using this instance. There might be some
// outstanding threads that hold on to the connection via a weak pointer but they won't be able to acquire
// the instance since it is being destroyed. Note that the event may actually be in non-signaled state here.
// This for instance happens when the connection goes out of scope while a reconnect is in progress. In this
// case the reconnect logic will not be able to acquire the connection instance from the weak_pointer to
// signal the event so this dtor would hang indefinitely. Using a shared_ptr to the connection in reconnect
// is not a good idea since it would prevent from invoking this dtor until the connection is reconnected or
// reconnection fails even if the instance actually went out of scope.
m_start_completed_event.set();
shutdown().get();
}
@ -206,12 +200,6 @@ namespace signalr
// no op after connection started successfully
connect_request_tce.set_exception(e);
auto connection = weak_connection.lock();
if (connection)
{
connection->reconnect();
}
};
auto transport = connection->m_transport_factory->create_transport(
@ -490,7 +478,7 @@ namespace signalr
return pplx::create_task([](){}, cts.get_token());
}
// we request a cancellation of the ongoing start or reconnect request (if any) and wait until it is cancelled
// we request a cancellation of the ongoing start (if any) and wait until it is canceled
m_disconnect_cts.cancel();
while (m_start_completed_event.wait(60000) != 0)
@ -499,14 +487,14 @@ namespace signalr
_XPLATSTR("internal error - stopping the connection is still waiting for the start operation to finish which should have already finished or timed out"));
}
// at this point we are either in the connected, reconnecting or disconnected state. If we are in the disconnected state
// at this point we are either in the connected or disconnected state. If we are in the disconnected state
// we must break because the transport has already been nulled out.
if (m_connection_state == connection_state::disconnected)
{
return pplx::task_from_result();
}
_ASSERTE(m_connection_state == connection_state::connected || m_connection_state == connection_state::reconnecting);
_ASSERTE(m_connection_state == connection_state::connected);
change_state(connection_state::disconnecting);
}
@ -514,223 +502,6 @@ namespace signalr
return m_transport->disconnect();
}
void connection_impl::reconnect()
{
m_logger.log(trace_level::info, _XPLATSTR("connection lost - trying to re-establish connection"));
pplx::cancellation_token_source disconnect_cts;
{
std::lock_guard<std::mutex> lock(m_stop_lock);
m_logger.log(trace_level::info, _XPLATSTR("acquired lock before invoking reconnecting callback"));
// reconnect might be called when starting the connection has not finished yet so wait until it is done
// before actually trying to reconnect
while (m_start_completed_event.wait(60000) != 0)
{
m_logger.log(trace_level::errors,
_XPLATSTR("internal error - reconnect is still waiting for the start operation to finish which should have already finished or timed out"));
}
// exit if starting the connection has not completed successfully or there is an ongoing stop request
if (!change_state(connection_state::connected, connection_state::reconnecting))
{
m_logger.log(trace_level::info,
_XPLATSTR("reconnecting cancelled - connection is not in the connected state"));
return;
}
disconnect_cts = m_disconnect_cts;
}
try
{
m_logger.log(trace_level::info, _XPLATSTR("invoking reconnecting callback"));
m_reconnecting();
m_logger.log(trace_level::info, _XPLATSTR("reconnecting callback returned without error"));
}
catch (const std::exception &e)
{
m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnecting callback threw an exception: "))
.append(utility::conversions::to_string_t(e.what())));
}
catch (...)
{
m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnecting callback threw an unknown exception")));
}
{
std::lock_guard<std::mutex> lock(m_stop_lock);
m_logger.log(trace_level::info, _XPLATSTR("acquired lock before starting reconnect logic"));
// This is to prevent a case where a connection was stopped (and possibly restarted and got into a reconnecting
// state) after we changed the state to reconnecting in the original reconnecting request. In this case we have
// the original cts which would have been cancelled by the stop request and we can use it to stop the original
// reconnecting request
if (disconnect_cts.get_token().is_canceled())
{
m_logger.log(trace_level::info,
_XPLATSTR("reconnecting canceled - connection was stopped and restarted after reconnecting started"));
return;
}
// we set the connection to the reconnecting before we invoked the reconnecting callback. If the connection
// state changed from the reconnecting state the user might have stopped/restarted the connection in the
// reconnecting callback or there might have started stopping the connection on the main thread and we should
// not try to continue the reconnect
if (m_connection_state != connection_state::reconnecting)
{
m_logger.log(trace_level::info,
_XPLATSTR("reconnecting canceled - connection is no longer in the reconnecting state"));
return;
}
// re-using the start completed event is safe because you cannot start the connection if it is not in the
// disconnected state. It also make it easier to handle stopping the connection when it is reconnecting.
m_start_completed_event.reset();
}
auto reconnect_url = url_builder::build_reconnect(m_base_url, m_transport->get_transport_type(),
m_message_id, m_groups_token, m_query_string);
auto weak_connection = std::weak_ptr<connection_impl>(shared_from_this());
// this is non-blocking
try_reconnect(reconnect_url, utility::datetime::utc_now().to_interval(), m_reconnect_window, m_reconnect_delay, disconnect_cts)
.then([weak_connection](pplx::task<bool> reconnect_task)
{
// try reconnect does not throw
auto reconnected = reconnect_task.get();
auto connection = weak_connection.lock();
if (!connection)
{
// connection instance went away - nothing to be done
return pplx::task_from_result();
}
if (reconnected)
{
if (!connection->change_state(connection_state::reconnecting, connection_state::connected))
{
connection->m_logger.log(trace_level::errors,
utility::string_t(_XPLATSTR("internal error - transition from an unexpected state. expected state: reconnecting, actual state: "))
.append(translate_connection_state(connection->get_connection_state())));
_ASSERTE(false);
}
// we must set the event before calling into the user code to prevent a deadlock that would happen
// if the user called stop() from the handler
connection->m_start_completed_event.set();
try
{
connection->m_logger.log(trace_level::info, _XPLATSTR("invoking reconnected callback"));
connection->m_reconnected();
connection->m_logger.log(trace_level::info, _XPLATSTR("reconnected callback returned without error"));
}
catch (const std::exception &e)
{
connection->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnected callback threw an exception: "))
.append(utility::conversions::to_string_t(e.what())));
}
catch (...)
{
connection->m_logger.log(
trace_level::errors, _XPLATSTR("reconnected callback threw an unknown exception"));
}
return pplx::task_from_result();
}
connection->m_start_completed_event.set();
return connection->stop();
});
}
// the assumption is that this function won't throw
pplx::task<bool> connection_impl::try_reconnect(const web::uri& reconnect_url, const utility::datetime::interval_type reconnect_start_time,
int reconnect_window /*milliseconds*/, int reconnect_delay /*milliseconds*/, pplx::cancellation_token_source disconnect_cts)
{
if (disconnect_cts.get_token().is_canceled())
{
log(m_logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto weak_connection = std::weak_ptr<connection_impl>(shared_from_this());
auto& logger = m_logger;
return m_transport->connect(reconnect_url)
.then([weak_connection, reconnect_url, reconnect_start_time, reconnect_window, reconnect_delay, logger, disconnect_cts]
(pplx::task<void> reconnect_task)
{
try
{
log(logger, trace_level::info, _XPLATSTR("reconnect attempt starting"));
reconnect_task.get();
log(logger, trace_level::info, _XPLATSTR("reconnect attempt completed successfully"));
return pplx::task_from_result<bool>(true);
}
catch (const std::exception& e)
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnect attempt failed due to: "))
.append(utility::conversions::to_string_t(e.what())));
}
if (disconnect_cts.get_token().is_canceled())
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto reconnect_window_end = reconnect_start_time + utility::datetime::from_milliseconds(reconnect_window);
if (utility::datetime::utc_now().to_interval() + utility::datetime::from_milliseconds(reconnect_delay) > reconnect_window_end)
{
utility::ostringstream_t oss;
oss << _XPLATSTR("connection could not be re-established within the configured timeout of ")
<< reconnect_window << _XPLATSTR(" milliseconds");
log(logger, trace_level::info, oss.str());
return pplx::task_from_result<bool>(false);
}
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_delay));
if (disconnect_cts.get_token().is_canceled())
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto connection = weak_connection.lock();
if (connection)
{
return connection->try_reconnect(reconnect_url, reconnect_start_time, reconnect_window, reconnect_delay, disconnect_cts);
}
log(logger, trace_level::info, _XPLATSTR("reconnecting cancelled - connection no longer valid."));
return pplx::task_from_result<bool>(false);
});
}
connection_state connection_impl::get_connection_state() const
{
return m_connection_state.load();
@ -773,30 +544,12 @@ namespace signalr
m_signalr_client_config = config;
}
void connection_impl::set_reconnecting(const std::function<void()>& reconnecting)
{
ensure_disconnected(_XPLATSTR("cannot set the reconnecting callback when the connection is not in the disconnected state. "));
m_reconnecting = reconnecting;
}
void connection_impl::set_reconnected(const std::function<void()>& reconnected)
{
ensure_disconnected(_XPLATSTR("cannot set the reconnected callback when the connection is not in the disconnected state. "));
m_reconnected = reconnected;
}
void connection_impl::set_disconnected(const std::function<void()>& disconnected)
{
ensure_disconnected(_XPLATSTR("cannot set the disconnected callback when the connection is not in the disconnected state. "));
m_disconnected = disconnected;
}
void connection_impl::set_reconnect_delay(const int reconnect_delay)
{
ensure_disconnected(_XPLATSTR("cannot set reconnect delay when the connection is not in the disconnected state. "));
m_reconnect_delay = reconnect_delay;
}
void connection_impl::ensure_disconnected(const utility::string_t& error_message)
{
auto state = get_connection_state();
@ -851,8 +604,6 @@ namespace signalr
return _XPLATSTR("connecting");
case connection_state::connected:
return _XPLATSTR("connected");
case connection_state::reconnecting:
return _XPLATSTR("reconnecting");
case connection_state::disconnecting:
return _XPLATSTR("disconnecting");
case connection_state::disconnected:
@ -862,13 +613,4 @@ namespace signalr
return _XPLATSTR("(unknown)");
}
}
namespace
{
// this is a workaround for the VS2013 compiler bug where mutable lambdas won't compile sometimes
static void log(const logger& logger, trace_level level, const utility::string_t& entry)
{
const_cast<signalr::logger &>(logger).log(level, entry);
}
}
}

View File

@ -46,11 +46,8 @@ namespace signalr
void set_message_received_string(const std::function<void(const utility::string_t&)>& message_received);
void set_message_received_json(const std::function<void(const web::json::value&)>& message_received);
void set_reconnecting(const std::function<void()>& reconnecting);
void set_reconnected(const std::function<void()>& reconnected);
void set_disconnected(const std::function<void()>& disconnected);
void set_client_config(const signalr_client_config& config);
void set_reconnect_delay(const int reconnect_delay /*milliseconds*/);
void set_connection_data(const utility::string_t& connection_data);
@ -64,8 +61,6 @@ namespace signalr
std::unique_ptr<transport_factory> m_transport_factory;
std::function<void(const web::json::value&)> m_message_received;
std::function<void()> m_reconnecting;
std::function<void()> m_reconnected;
std::function<void()> m_disconnected;
signalr_client_config m_signalr_client_config;
@ -74,8 +69,6 @@ namespace signalr
event m_start_completed_event;
utility::string_t m_connection_id;
utility::string_t m_connection_data;
int m_reconnect_window; // in milliseconds
int m_reconnect_delay; // in milliseconds
utility::string_t m_message_id;
utility::string_t m_groups_token;
bool m_handshakeReceived;
@ -90,9 +83,6 @@ namespace signalr
void process_response(const utility::string_t& response, const pplx::task_completion_event<void>& connect_request_tce);
pplx::task<void> shutdown();
void reconnect();
pplx::task<bool> try_reconnect(const web::uri& reconnect_url, const utility::datetime::interval_type reconnect_start_time,
int reconnect_window, int reconnect_delay, pplx::cancellation_token_source disconnect_cts);
bool change_state(connection_state old_state, connection_state new_state);
connection_state change_state(connection_state new_state);

View File

@ -9,8 +9,8 @@
namespace signalr
{
hub_connection::hub_connection(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, std::shared_ptr<log_writer> log_writer, bool use_default_url)
: m_pImpl(hub_connection_impl::create(url, query_string, trace_level, std::move(log_writer), use_default_url))
trace_level trace_level, std::shared_ptr<log_writer> log_writer)
: m_pImpl(hub_connection_impl::create(url, query_string, trace_level, std::move(log_writer)))
{}
// Do NOT remove this destructor. Letting the compiler generate and inline the default dtor may lead to
@ -67,16 +67,6 @@ namespace signalr
return m_pImpl->get_connection_id();
}
void hub_connection::set_reconnecting(const std::function<void()>& reconnecting_callback)
{
m_pImpl->set_reconnecting(reconnecting_callback);
}
void hub_connection::set_reconnected(const std::function<void()>& reconnected_callback)
{
m_pImpl->set_reconnected(reconnected_callback);
}
void hub_connection::set_disconnected(const std::function<void()>& disconnected_callback)
{
m_pImpl->set_disconnected(disconnected_callback);
@ -86,4 +76,4 @@ namespace signalr
{
m_pImpl->set_client_config(config);
}
}
}

View File

@ -18,24 +18,21 @@ namespace signalr
static std::function<void(const json::value&)> create_hub_invocation_callback(const logger& logger,
const std::function<void(const json::value&)>& set_result,
const std::function<void(const std::exception_ptr e)>& set_exception);
static utility::string_t adapt_url(const utility::string_t& url, bool use_default_url);
}
std::shared_ptr<hub_connection_impl> hub_connection_impl::create(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer, bool use_default_url)
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer)
{
return hub_connection_impl::create(url, query_string, trace_level, log_writer, use_default_url,
return hub_connection_impl::create(url, query_string, trace_level, log_writer,
std::make_unique<web_request_factory>(), std::make_unique<transport_factory>());
}
std::shared_ptr<hub_connection_impl> hub_connection_impl::create(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer, bool use_default_url,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory)
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer, std::unique_ptr<web_request_factory> web_request_factory,
std::unique_ptr<transport_factory> transport_factory)
{
auto connection = std::shared_ptr<hub_connection_impl>(new hub_connection_impl(url, query_string, trace_level,
log_writer ? log_writer : std::make_shared<trace_log_writer>(), use_default_url,
std::move(web_request_factory), std::move(transport_factory)));
log_writer ? log_writer : std::make_shared<trace_log_writer>(), std::move(web_request_factory), std::move(transport_factory)));
connection->initialize();
@ -43,11 +40,11 @@ namespace signalr
}
hub_connection_impl::hub_connection_impl(const utility::string_t& url, const utility::string_t& query_string, trace_level trace_level,
const std::shared_ptr<log_writer>& log_writer, bool use_default_url, std::unique_ptr<web_request_factory> web_request_factory,
const std::shared_ptr<log_writer>& log_writer, std::unique_ptr<web_request_factory> web_request_factory,
std::unique_ptr<transport_factory> transport_factory)
: m_connection(connection_impl::create(adapt_url(url, use_default_url), query_string, trace_level, log_writer,
: m_connection(connection_impl::create(url, query_string, trace_level, log_writer,
std::move(web_request_factory), std::move(transport_factory))),m_logger(log_writer, trace_level),
m_callback_manager(json::value::parse(_XPLATSTR("{ \"E\" : \"connection went out of scope before invocation result was received\"}")))
m_callback_manager(json::value::parse(_XPLATSTR("{ \"error\" : \"connection went out of scope before invocation result was received\"}")))
{ }
void hub_connection_impl::initialize()
@ -65,8 +62,6 @@ namespace signalr
connection->process_message(message);
}
});
set_reconnecting([](){});
}
void hub_connection_impl::on(const utility::string_t& event_name, const std::function<void(const json::value &)>& handler)
@ -99,7 +94,7 @@ namespace signalr
pplx::task<void> hub_connection_impl::stop()
{
m_callback_manager.clear(json::value::parse(_XPLATSTR("{ \"E\" : \"connection was stopped before invocation result was received\"}")));
m_callback_manager.clear(json::value::parse(_XPLATSTR("{ \"error\" : \"connection was stopped before invocation result was received\"}")));
return m_connection->stop();
}
@ -225,29 +220,6 @@ namespace signalr
m_connection->set_client_config(config);
}
void hub_connection_impl::set_reconnecting(const std::function<void()>& reconnecting)
{
// weak_ptr prevents a circular dependency leading to memory leak and other problems
auto weak_hub_connection = std::weak_ptr<hub_connection_impl>(shared_from_this());
m_connection->set_reconnecting([weak_hub_connection, reconnecting]()
{
auto hub_connection = weak_hub_connection.lock();
if (hub_connection)
{
hub_connection->m_callback_manager.clear(
json::value::parse(_XPLATSTR("{ \"E\" : \"connection has been lost\"}")));
}
reconnecting();
});
}
void hub_connection_impl::set_reconnected(const std::function<void()>& reconnected)
{
m_connection->set_reconnected(reconnected);
}
void hub_connection_impl::set_disconnected(const std::function<void()>& disconnected)
{
m_connection->set_disconnected(disconnected);
@ -270,27 +242,11 @@ namespace signalr
{
set_exception(
std::make_exception_ptr(
signalr_exception(message.at(_XPLATSTR("error")).serialize())));
hub_exception(message.at(_XPLATSTR("error")).serialize())));
}
set_result(json::value::null());
};
}
static utility::string_t adapt_url(const utility::string_t& url, bool use_default_url)
{
if (use_default_url)
{
auto new_url = url;
if (new_url.back() != _XPLATSTR('/'))
{
new_url.append(_XPLATSTR("/"));
}
return new_url;
}
return url;
}
}
}
}

View File

@ -22,10 +22,10 @@ namespace signalr
{
public:
static std::shared_ptr<hub_connection_impl> create(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer, bool use_default_url);
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer);
static std::shared_ptr<hub_connection_impl> create(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer, bool use_default_url,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory);
hub_connection_impl(const hub_connection_impl&) = delete;
@ -43,14 +43,12 @@ namespace signalr
utility::string_t get_connection_id() const;
void set_client_config(const signalr_client_config& config);
void set_reconnecting(const std::function<void()>& reconnecting);
void set_reconnected(const std::function<void()>& reconnected);
void set_disconnected(const std::function<void()>& disconnected);
private:
hub_connection_impl(const utility::string_t& url, const utility::string_t& query_string, trace_level trace_level,
const std::shared_ptr<log_writer>& log_writer, bool use_default_url,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory);
const std::shared_ptr<log_writer>& log_writer, std::unique_ptr<web_request_factory> web_request_factory,
std::unique_ptr<transport_factory> transport_factory);
std::shared_ptr<connection_impl> m_connection;
logger m_logger;

View File

@ -80,7 +80,7 @@ namespace signalr
const utility::string_t& connection_data, const utility::string_t& query_string,
const utility::string_t& last_message_id = _XPLATSTR(""), const utility::string_t& groups_token = _XPLATSTR(""))
{
_ASSERTE(command == _XPLATSTR("reconnect") || (last_message_id.length() == 0 && groups_token.length() == 0));
_ASSERTE(last_message_id.length() == 0 && groups_token.length() == 0);
web::uri_builder builder(base_url);
builder.append_path(command);
@ -113,13 +113,6 @@ namespace signalr
//return convert_to_websocket_url(builder, transport).to_uri();
}
web::uri build_reconnect(const web::uri& base_url, transport_type transport, const utility::string_t& last_message_id, const utility::string_t& groups_token,
const utility::string_t& query_string)
{
auto builder = build_uri(base_url, _XPLATSTR("reconnect"), transport, query_string, last_message_id, groups_token);
return convert_to_websocket_url(builder, transport).to_uri();
}
web::uri build_start(const web::uri &base_url, const utility::string_t &query_string)
{
return build_uri(base_url, _XPLATSTR(""), query_string).to_uri();
@ -131,4 +124,4 @@ namespace signalr
return build_uri(base_url, _XPLATSTR("abort"), transport, connection_data, query_string).to_uri();
}
}
}
}

View File

@ -12,8 +12,6 @@ namespace signalr
{
web::uri build_negotiate(const web::uri& base_url, const utility::string_t& query_string);
web::uri build_connect(const web::uri& base_url, transport_type transport, const utility::string_t& query_string);
web::uri build_reconnect(const web::uri& base_url, transport_type transport, const utility::string_t& last_message_id,
const utility::string_t& groups_token, const utility::string_t& query_string);
web::uri build_start(const web::uri& base_url, const utility::string_t& query_string);
web::uri build_abort(const web::uri &base_url, transport_type transport,
const utility::string_t& connection_data, const utility::string_t& query_string);

View File

@ -13,32 +13,10 @@
extern utility::string_t url;
TEST(hub_connection_tests, connection_status_start_stop_start_reconnect)
TEST(hub_connection_tests, connection_status_start_stop_start)
{
auto hub_conn = std::make_shared<signalr::hub_connection>(url);
auto weak_hub_conn = std::weak_ptr<signalr::hub_connection>(hub_conn);
auto reconnecting_event = std::make_shared<signalr::event>();
auto reconnected_event = std::make_shared<signalr::event>();
hub_conn->set_reconnecting([weak_hub_conn, reconnecting_event]()
{
auto conn = weak_hub_conn.lock();
if (conn)
{
ASSERT_EQ(conn->get_connection_state(), signalr::connection_state::reconnecting);
}
reconnecting_event->set();
});
hub_conn->set_reconnected([weak_hub_conn, reconnected_event]()
{
auto conn = weak_hub_conn.lock();
if (conn)
{
ASSERT_EQ(conn->get_connection_state(), signalr::connection_state::connected);
}
reconnected_event->set();
});
hub_conn->start().get();
ASSERT_EQ(hub_conn->get_connection_state(), signalr::connection_state::connected);
@ -48,22 +26,11 @@ TEST(hub_connection_tests, connection_status_start_stop_start_reconnect)
hub_conn->start().get();
ASSERT_EQ(hub_conn->get_connection_state(), signalr::connection_state::connected);
try
{
hub_conn->send(U("forceReconnect")).get();
}
catch (...)
{
}
ASSERT_FALSE(reconnecting_event->wait(2000));
ASSERT_FALSE(reconnected_event->wait(2000));
}
TEST(hub_connection_tests, send_message)
{
auto hub_conn = std::make_shared<signalr::hub_connection>(url + U("custom"), U(""), signalr::trace_level::all, nullptr, false);
auto hub_conn = std::make_shared<signalr::hub_connection>(url + U("custom"), U(""), signalr::trace_level::all, nullptr);
auto message = std::make_shared<utility::string_t>();
auto received_event = std::make_shared<signalr::event>();
@ -133,46 +100,6 @@ TEST(hub_connection_tests, send_message_after_connection_restart)
ASSERT_EQ(*message, U("[\"Send: test\"]"));
}
TEST(hub_connection_tests, send_message_after_reconnect)
{
auto hub_conn = std::make_shared<signalr::hub_connection>(url);
auto message = std::make_shared<utility::string_t>();
auto reconnected_event = std::make_shared<signalr::event>();
auto received_event = std::make_shared<signalr::event>();
hub_conn->set_reconnected([reconnected_event]()
{
reconnected_event->set();
});
hub_conn->on(U("sendString"), [message, received_event](const web::json::value& arguments)
{
*message = arguments.serialize();
received_event->set();
});
hub_conn->start().get();
try
{
hub_conn->send(U("forceReconnect")).get();
}
catch (...)
{
}
ASSERT_FALSE(reconnected_event->wait(2000));
web::json::value obj{};
obj[0] = web::json::value(U("test"));
hub_conn->invoke(U("invokeWithString"), obj).get();
ASSERT_FALSE(received_event->wait(2000));
ASSERT_EQ(*message, U("[\"Send: test\"]"));
}
TEST(hub_connection_tests, send_message_empty_param)
{
auto hub_conn = std::make_shared<signalr::hub_connection>(url);
@ -288,35 +215,13 @@ TEST(hub_connection_tests, send_message_complex_type_return)
ASSERT_EQ(test.serialize(), U("{\"Address\":{\"Street\":\"main st\",\"Zip\":\"98052\"},\"Age\":15,\"Name\":\"test\"}"));
}
TEST(hub_connection_tests, connection_id_start_stop_start_reconnect)
TEST(hub_connection_tests, connection_id_start_stop_start)
{
auto hub_conn = std::make_shared<signalr::hub_connection>(url);
auto weak_hub_conn = std::weak_ptr<signalr::hub_connection>(hub_conn);
auto reconnecting_event = std::make_shared<signalr::event>();
auto reconnected_event = std::make_shared<signalr::event>();
utility::string_t connection_id;
hub_conn->set_reconnecting([weak_hub_conn, reconnecting_event, &connection_id]()
{
auto conn = weak_hub_conn.lock();
if (conn)
{
ASSERT_EQ(conn->get_connection_id(), connection_id);
}
reconnecting_event->set();
});
hub_conn->set_reconnected([weak_hub_conn, reconnected_event, &connection_id]()
{
auto conn = weak_hub_conn.lock();
if (conn)
{
ASSERT_EQ(conn->get_connection_id(), connection_id);
}
reconnected_event->set();
});
ASSERT_EQ(U(""), hub_conn->get_connection_id());
hub_conn->start().get();
@ -331,17 +236,6 @@ TEST(hub_connection_tests, connection_id_start_stop_start_reconnect)
ASSERT_NE(hub_conn->get_connection_id(), connection_id);
connection_id = hub_conn->get_connection_id();
try
{
hub_conn->send(U("forceReconnect")).get();
}
catch (...)
{
}
ASSERT_FALSE(reconnecting_event->wait(2000));
ASSERT_FALSE(reconnected_event->wait(2000));
}
//TEST(hub_connection_tests, mirror_header)

View File

@ -33,7 +33,7 @@ TEST(connection_impl_connection_state, initial_connection_state_is_disconnected)
TEST(connection_impl_start, cannot_start_non_disconnected_exception)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client);
connection->start().wait();
@ -82,7 +82,7 @@ TEST(connection_impl_start, connection_state_is_connecting_when_connection_is_be
TEST(connection_impl_start, connection_state_is_connected_when_connection_established_succesfully)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client);
connection->start().get();
ASSERT_EQ(connection->get_connection_state(), connection_state::connected);
@ -270,7 +270,7 @@ TEST(connection_impl_start, start_fails_if_start_request_fails)
auto websocket_client = std::make_shared<test_websocket_client>();
websocket_client->set_receive_function([]()->pplx::task<std::string>
{
return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }"));
return pplx::task_from_result(std::string("{ }\x1e"));
});
auto connection =
@ -359,7 +359,7 @@ TEST(connection_impl_process_response, process_response_logs_messages)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::messages);
connection->start().get();
@ -376,7 +376,7 @@ TEST(connection_impl_send, message_sent)
utility::string_t actual_message;
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); },
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */ [&actual_message](const utility::string_t& message)
{
actual_message = message;
@ -417,7 +417,7 @@ TEST(connection_impl_send, exceptions_from_send_logged_and_propagated)
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); },
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */ [](const utility::string_t&){ return pplx::task_from_exception<void>(std::runtime_error("error")); });
auto connection = create_connection(websocket_client, writer, trace_level::errors);
@ -452,7 +452,7 @@ TEST(connection_impl_set_message_received, callback_invoked_when_message_receive
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ \"C\":\"x\", \"G\":\"gr0\", \"M\":[]}",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"Test\"] }",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"release\"] }",
@ -496,7 +496,7 @@ TEST(connection_impl_set_message_received, exception_from_callback_caught_and_lo
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"throw\"] }",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"release\"] }",
"{}"
@ -542,7 +542,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"throw\"] }",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"release\"] }",
"{}"
@ -589,7 +589,7 @@ TEST(connection_impl_set_message_received, error_logged_for_malformed_payload)
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ 42",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"release\"] }",
"{}"
@ -629,7 +629,7 @@ TEST(connection_impl_set_message_received, unexpected_responses_logged)
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"42",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"release\"] }",
"{}"
@ -664,7 +664,7 @@ TEST(connection_impl_set_message_received, unexpected_responses_logged)
void can_be_set_only_in_disconnected_state(std::function<void(connection_impl *)> callback, const char* expected_exception_message)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client);
connection->start().get();
@ -694,20 +694,6 @@ TEST(connection_impl_set_configuration, set_message_received_json_callback_can_b
"cannot set the callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnecting_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnecting([](){}); },
"cannot set the reconnecting callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnected_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnected([](){}); },
"cannot set the reconnected callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_disconnected_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
@ -715,13 +701,6 @@ TEST(connection_impl_set_configuration, set_disconnected_callback_can_be_set_onl
"cannot set the disconnected callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnect_delay_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnect_delay(100); },
"cannot set reconnect delay when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_stop, stopping_disconnected_connection_is_no_op)
{
std::shared_ptr<log_writer> writer{ std::make_shared<memory_log_writer>() };
@ -742,7 +721,7 @@ TEST(connection_impl_stop, stopping_disconnecting_connection_returns_cancelled_t
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); },
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */ [&close_event](const web::uri&) { return pplx::task_from_result(); },
/* close function */ [&close_event]()
@ -784,7 +763,7 @@ TEST(connection_impl_stop, can_start_and_stop_connection)
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->start()
@ -807,7 +786,7 @@ TEST(connection_impl_stop, can_start_and_stop_connection_multiple_times)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->start()
@ -852,7 +831,7 @@ TEST(connection_impl_stop, dtor_stops_the_connection)
/* receive function */ []()
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }"));
return pplx::task_from_result(std::string("{ }\x1e"));
});
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
@ -885,7 +864,7 @@ TEST(connection_impl_stop, stop_cancels_ongoing_start_request)
/* receive function */ [disconnect_completed_event]()
{
disconnect_completed_event->wait();
return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }"));
return pplx::task_from_result(std::string("{ }\x1e"));
});
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
@ -984,7 +963,7 @@ TEST(connection_impl_stop, stop_ignores_exceptions_from_abort_requests)
});
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::state_changes,
@ -1009,7 +988,7 @@ TEST(connection_impl_stop, stop_ignores_exceptions_from_abort_requests)
TEST(connection_impl_stop, stop_invokes_disconnected_callback)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client);
auto disconnected_invoked = false;
@ -1050,7 +1029,7 @@ TEST(connection_impl_stop, exception_for_disconnected_callback_caught_and_logged
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_disconnected([](){ throw 42; });
@ -1128,7 +1107,7 @@ TEST(connection_impl_change_state, change_state_logs)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{\"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->start().wait();
@ -1140,695 +1119,6 @@ TEST(connection_impl_change_state, change_state_logs)
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), entry);
}
TEST(connection_impl_reconnect, can_reconnect)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
auto connection = create_connection(websocket_client);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
}
TEST(connection_impl_reconnect, successful_reconnect_state_changes)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(4U, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> connected\n"), remove_date_from_log_entry(log_entries[3]));
}
TEST(connection_impl_reconnect, connection_stopped_if_reconnecting_failed)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
int reconnect_invocations = 0;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[&reconnect_invocations](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
reconnect_invocations++;
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::state_changes,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
auto disconnected_event = std::make_shared<event>();
connection->set_disconnected([disconnected_event](){ disconnected_event->set(); });
connection->set_reconnect_delay(100);
connection->start();
ASSERT_FALSE(disconnected_event->wait(5000));
ASSERT_EQ(connection_state::disconnected, connection->get_connection_state());
ASSERT_GE(reconnect_invocations, 2);
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(5U, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}
TEST(connection_impl_reconnect, reconnect_works_if_connection_dropped_during_after_init_and_before_start_successfully_completed)
{
auto connection_dropped_event = std::make_shared<event>();
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, connection_dropped_event]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number == 1)
{
connection_dropped_event->set();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
return pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(4U, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> connected\n"), remove_date_from_log_entry(log_entries[3]));
}
TEST(connection_impl_reconnect, reconnect_cancelled_if_connection_dropped_during_start_and_start_failed)
{
auto connection_dropped_event = std::make_shared<event>();
auto web_request_factory = std::make_unique<test_web_request_factory>([&connection_dropped_event](const web::uri& url)
{
if (url.path() == _XPLATSTR("/start"))
{
connection_dropped_event->wait();
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)404, _XPLATSTR("Bad request"), _XPLATSTR("")));
}
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, connection_dropped_event]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number == 1)
{
connection_dropped_event->set();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
return pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::state_changes | trace_level::info,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
try
{
connection->start().get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::exception&)
{
}
// Reconnecting happens on its own thread. If the connection is dropped after a successfull /connect but before the
// entire start sequence completes the reconnect thread is blocked to see if the starts sequence succeded or not.
// If the start sequence ultimately fails the reconnect logic will not be run - the reconnect thread will exit.
// However there is no further synchronization between start and reconnect threads so the order in which they will
// finish is not defined. Note that this does not matter for the user since they don't directly depend on/observe
// the reconnect in any way. In tests however if the start thread finishes first we can get here while the reconnect
// thread still has not finished. This would make the test fail so we need to wait until the reconnect thread finishes
// which will be when it logs a message that it is giving up reconnecting.
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 100 && memory_writer->get_log_entries().size() < 6; wait_time_ms <<= 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(6U, log_entries.size()) << dump_vector(log_entries);
auto state_changes = filter_vector(log_entries, _XPLATSTR("[state change]"));
ASSERT_EQ(2U, state_changes.size()) << dump_vector(log_entries);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(state_changes[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> disconnected\n"), remove_date_from_log_entry(state_changes[1]));
auto info_entries = filter_vector(log_entries, _XPLATSTR("[info ]"));
ASSERT_EQ(4U, info_entries.size()) << dump_vector(log_entries);
ASSERT_EQ(_XPLATSTR("[info ] [websocket transport] connecting to: ws://reconnect_cancelled_if_connection_dropped_during_start_and_start_failed/connect?transport=webSockets&clientProtocol=1.4&connectionToken=A%3D%3D\n"), remove_date_from_log_entry(info_entries[0]));
ASSERT_EQ(_XPLATSTR("[info ] connection lost - trying to re-establish connection\n"), remove_date_from_log_entry(info_entries[1]));
ASSERT_EQ(_XPLATSTR("[info ] acquired lock before invoking reconnecting callback\n"), remove_date_from_log_entry(info_entries[2]));
ASSERT_EQ(_XPLATSTR("[info ] reconnecting cancelled - connection is not in the connected state\n"), remove_date_from_log_entry(info_entries[3]));
}
TEST(connection_impl_reconnect, reconnect_cancelled_when_connection_being_stopped)
{
std::atomic<bool> connection_started{ false };
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, &connection_started]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}"
};
call_number = std::min(call_number + 1, 1);
return connection_started
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::all);
connection->set_reconnect_delay(100);
event reconnecting_event{};
connection->set_reconnecting([&reconnecting_event](){ reconnecting_event.set(); });
connection->start().then([&connection_started](){ connection_started = true; });
ASSERT_FALSE(reconnecting_event.wait(5000));
connection->stop().get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
auto state_changes = filter_vector(log_entries, _XPLATSTR("[state change]"));
ASSERT_EQ(state_changes.size(), 5U);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(state_changes[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(state_changes[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(state_changes[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(state_changes[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(state_changes[4]));
// there is an iherent race between stop and reconnect to acquire the lock which results in finishing reconnecting
// in one of two ways and, sometimes, in completing stopping the connection before finishing reconnecting
for (int wait_time_ms = 5; wait_time_ms < 100; wait_time_ms <<= 1)
{
log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
if ((filter_vector(log_entries, _XPLATSTR("[info ] reconnecting cancelled - connection is being stopped. line")).size() +
filter_vector(log_entries, _XPLATSTR("[info ] reconnecting cancelled - connection was stopped and restarted after reconnecting started")).size()) != 0)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
ASSERT_EQ(1U,
filter_vector(log_entries, _XPLATSTR("[info ] reconnecting cancelled - connection is being stopped. line")).size() +
filter_vector(log_entries, _XPLATSTR("[info ] reconnecting cancelled - connection was stopped and restarted after reconnecting started")).size())
<< dump_vector(log_entries);
}
TEST(connection_impl_reconnect, reconnect_cancelled_if_connection_goes_out_of_scope)
{
std::atomic<bool> connection_started{ false };
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, &connection_started]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}"
};
call_number = std::min(call_number + 1, 1);
return connection_started
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
{
auto connection = create_connection(websocket_client, writer, trace_level::all);
connection->set_reconnect_delay(100);
event reconnecting_event{};
connection->set_reconnecting([&reconnecting_event](){ reconnecting_event.set(); });
connection->start().then([&connection_started](){ connection_started = true; });
ASSERT_FALSE(reconnecting_event.wait(5000));
}
// The connection_impl destructor does can be called on a different thread. This is because it is being internally
// used by tasks as a shared_ptr. As a result the dtor is being called on the thread which released the last reference.
// Therefore we need to wait block until the dtor has actually completed. Time out would most likely indicate a bug.
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 10000; wait_time_ms <<= 1)
{
if (filter_vector(std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries(),
_XPLATSTR("[state change] disconnecting -> disconnected")).size() > 0)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
auto log_entries = memory_writer->get_log_entries();
auto state_changes = filter_vector(log_entries, _XPLATSTR("[state change]"));
ASSERT_EQ(5U, state_changes.size()) << dump_vector(log_entries);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(state_changes[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(state_changes[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(state_changes[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(state_changes[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(state_changes[4]));
}
TEST(connection_impl_reconnect, std_exception_for_reconnected_reconnecting_callback_caught_and_logged)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_reconnect_delay(100);
connection->set_reconnecting([](){ throw std::runtime_error("exception from reconnecting"); });
auto reconnected_event = std::make_shared<event>();
connection->set_reconnected([reconnected_event]()
{
reconnected_event->set();
throw std::runtime_error("exception from reconnected");
});
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 100 && memory_writer->get_log_entries().size() < 3; wait_time_ms <<= 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(_XPLATSTR("[error ] reconnecting callback threw an exception: exception from reconnecting\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[error ] reconnected callback threw an exception: exception from reconnected\n"), remove_date_from_log_entry(log_entries[2]));
}
TEST(connection_impl_reconnect, exception_for_reconnected_reconnecting_callback_caught_and_logged)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_reconnect_delay(100);
connection->set_reconnecting([](){ throw 42; });
auto reconnected_event = std::make_shared<event>();
connection->set_reconnected([reconnected_event]()
{
reconnected_event->set();
throw 42;
});
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 100 && memory_writer->get_log_entries().size() < 3; wait_time_ms <<= 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(3U, log_entries.size());
ASSERT_EQ(_XPLATSTR("[error ] reconnecting callback threw an unknown exception\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[error ] reconnected callback threw an unknown exception\n"), remove_date_from_log_entry(log_entries[2]));
}
TEST(connection_impl_reconnect, can_stop_connection_from_reconnecting_event)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::state_changes,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
auto stop_event = std::make_shared<event>();
connection->set_reconnecting([&connection, stop_event]()
{
connection->stop()
.then([stop_event](){ stop_event->set(); });
});
connection->set_reconnect_delay(100);
connection->start();
ASSERT_FALSE(stop_event->wait(5000));
ASSERT_EQ(connection_state::disconnected, connection->get_connection_state());
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(5U, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}
TEST(connection_impl_reconnect, current_reconnect_cancelled_if_another_reconnect_initiated_from_reconnecting_event)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
auto allow_reconnect = std::make_shared<std::atomic<bool>>(false);
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, allow_reconnect]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = (call_number + 1) % 4;
return call_number == 2 && !(*allow_reconnect)
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[allow_reconnect](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect") && !(*allow_reconnect))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::all, writer,
std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
auto reconnecting_count = 0;
connection->set_reconnecting([&connection, reconnecting_count, allow_reconnect]() mutable
{
if (++reconnecting_count == 1)
{
connection->stop().get();
connection->start().get();
*allow_reconnect = true;
}
});
event reconnected_event;
connection->set_reconnected([&reconnected_event]()
{
reconnected_event.set();
});
connection->set_reconnect_delay(100);
connection->start();
ASSERT_FALSE(reconnected_event.wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
// There are two racing reconnect attemps happening at the same time. The second one sets the reconnect_event and
// unblocks the tests so that verification can happen. Sometimes however the second reconnect one finishes before
// the first and verification fails. We are blocking here until we get the expected message from the first reconnect
// or timeout. The threads doing reconnects are not observable outside so this is the only way to verify that both
// reconnect attempts have actually completed.
for (int wait_time_ms = 5; wait_time_ms < 100; wait_time_ms <<= 1)
{
if (filter_vector(std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries(),
_XPLATSTR("[info ] reconnecting cancelled - connection was stopped and restarted after reconnecting started")).size() > 0)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
}
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(1U,
filter_vector(log_entries, _XPLATSTR("[info ] reconnecting cancelled - connection was stopped and restarted after reconnecting started")).size())
<< dump_vector(log_entries);
auto state_changes = filter_vector(log_entries, _XPLATSTR("[state change]"));
ASSERT_EQ(9U, state_changes.size()) << dump_vector(log_entries);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(state_changes[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(state_changes[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(state_changes[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(state_changes[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(state_changes[4]));
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(state_changes[5]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(state_changes[6]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(state_changes[7]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> connected\n"), remove_date_from_log_entry(state_changes[8]));
}
TEST(connection_id, connection_id_is_set_if_start_fails_but_negotiate_request_succeeds)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
@ -1867,7 +1157,7 @@ TEST(connection_id, can_get_connection_id_when_connection_in_connected_state)
auto writer = std::shared_ptr<log_writer>{ std::make_shared<memory_log_writer>() };
auto websocket_client = create_test_websocket_client(
/* receive function */ [](){ return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
utility::string_t connection_id;
@ -1886,7 +1176,7 @@ TEST(connection_id, can_get_connection_id_after_connection_has_stopped)
auto writer = std::shared_ptr<log_writer>{ std::make_shared<memory_log_writer>() };
auto websocket_client = create_test_websocket_client(
/* receive function */ [](){ return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); });
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->start()
@ -1905,16 +1195,15 @@ TEST(connection_id, connection_id_reset_when_starting_connection)
auto writer = std::shared_ptr<log_writer>{ std::make_shared<memory_log_writer>() };
auto websocket_client = create_test_websocket_client(
/* receive function */ [](){ return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); });
auto web_request_factory = std::make_unique<test_web_request_factory>([&fail_http_requests](const web::uri &url) -> std::unique_ptr<web_request>
{
if (!fail_http_requests) {
auto response_body =
url.path() == _XPLATSTR("/negotiate") || url.path() == _XPLATSTR("/signalr/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 10.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{ \"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"availableTransports\" : [] }")
: url.path() == _XPLATSTR("/start") || url.path() == _XPLATSTR("/signalr/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");

View File

@ -16,11 +16,11 @@ using namespace signalr;
std::shared_ptr<hub_connection_impl> create_hub_connection(std::shared_ptr<websocket_client> websocket_client = create_test_websocket_client(),
std::shared_ptr<log_writer> log_writer = std::make_shared<trace_log_writer>(), trace_level trace_level = trace_level::all)
{
return hub_connection_impl::create(create_uri(), _XPLATSTR(""), trace_level, log_writer, /*use_default_url*/true,
return hub_connection_impl::create(create_uri(), _XPLATSTR(""), trace_level, log_writer,
create_test_web_request_factory(), std::make_unique<test_transport_factory>(websocket_client));
}
TEST(url, signalr_appended_to_url_if_use_default_url_true)
TEST(url, negotiate_appended_to_url)
{
utility::string_t base_urls[] = { _XPLATSTR("http://fakeuri"), _XPLATSTR("http://fakeuri/") };
@ -34,34 +34,7 @@ TEST(url, signalr_appended_to_url_if_use_default_url_true)
});
auto hub_connection = hub_connection_impl::create(base_url, _XPLATSTR(""), trace_level::none,
std::make_shared<trace_log_writer>(), /*use_default_url:*/ true, std::move(web_request_factory),
std::make_unique<test_transport_factory>(create_test_websocket_client()));
try
{
hub_connection->start().get();
}
catch (const std::exception&) { }
ASSERT_EQ(web::uri(_XPLATSTR("http://fakeuri/signalr/negotiate?clientProtocol=1.4")), requested_url);
}
}
TEST(url, signalr_not_appended_to_url_if_use_default_url_false)
{
utility::string_t base_urls[] = { _XPLATSTR("http://fakeuri"), _XPLATSTR("http://fakeuri/") };
for (const auto& base_url : base_urls)
{
web::uri requested_url;
auto web_request_factory = std::make_unique<test_web_request_factory>([&requested_url](const web::uri &url)
{
requested_url = url;
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)404, _XPLATSTR("Bad request"), _XPLATSTR("")));
});
auto hub_connection = hub_connection_impl::create(base_url, _XPLATSTR(""), trace_level::none,
std::make_shared<trace_log_writer>(), /*use_default_url:*/ false, std::move(web_request_factory),
std::make_shared<trace_log_writer>(), std::move(web_request_factory),
std::make_unique<test_transport_factory>(create_test_websocket_client()));
try
@ -70,14 +43,14 @@ TEST(url, signalr_not_appended_to_url_if_use_default_url_false)
}
catch (const std::exception&) {}
ASSERT_EQ(web::uri(_XPLATSTR("http://fakeuri/negotiate?clientProtocol=1.4")), requested_url);
ASSERT_EQ(web::uri(_XPLATSTR("http://fakeuri/negotiate")), requested_url);
}
}
TEST(start, start_starts_connection)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
@ -85,53 +58,10 @@ TEST(start, start_starts_connection)
ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
}
TEST(start, start_sets_connection_data)
{
web::uri requested_url;
auto web_request_factory = std::make_unique<test_web_request_factory>([&requested_url](const web::uri &url)
{
requested_url = url;
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)404, _XPLATSTR("Bad request"), _XPLATSTR("")));
});
auto hub_connection = hub_connection_impl::create(create_uri(), _XPLATSTR(""), trace_level::none,
std::make_shared<trace_log_writer>(), /*use_default_url:*/ true, std::move(web_request_factory),
std::make_unique<test_transport_factory>(create_test_websocket_client()));
try
{
hub_connection->start().get();
}
catch (...)
{
}
ASSERT_TRUE(
requested_url == web::uri(create_uri().append(_XPLATSTR("/signalr/negotiate?clientProtocol=1.4&connectionData=%5B%7B%22Name%22:%22my_hub%22%7D,%7B%22Name%22:%22your_hub%22%7D%5D"))) ||
requested_url == web::uri(create_uri().append(_XPLATSTR("/signalr/negotiate?clientProtocol=1.4&connectionData=%5B%7B%22Name%22:%22your_hub%22%7D,%7B%22Name%22:%22my_hub%22%7D%5D"))));
}
TEST(start, start_logs_if_no_hub_proxies_exist_for_hub_connection)
{
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
auto hub_connection = create_hub_connection(websocket_client, writer, trace_level::info);
hub_connection->start().get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_FALSE(log_entries.empty());
auto entry = remove_date_from_log_entry(log_entries[0]);
ASSERT_EQ(_XPLATSTR("[info ] no hub proxies exist for this hub connection\n"), entry);
}
TEST(stop, stop_stops_connection)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
@ -143,7 +73,7 @@ TEST(stop, stop_stops_connection)
TEST(stop, disconnected_callback_called_when_hub_connection_stops)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = create_hub_connection(websocket_client);
auto disconnected_invoked = false;
@ -161,7 +91,7 @@ TEST(stop, connection_stopped_when_going_out_of_scope)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = create_hub_connection(websocket_client, writer, trace_level::state_changes);
hub_connection->start().get();
@ -193,7 +123,7 @@ TEST(stop, stop_cancels_pending_callbacks)
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{}"
};
@ -207,7 +137,7 @@ TEST(stop, stop_cancels_pending_callbacks)
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
auto t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
auto t = hub_connection->invoke_json(_XPLATSTR("method"), json::value::array());
hub_connection->stop();
try
@ -229,7 +159,7 @@ TEST(stop, pending_callbacks_finished_if_hub_connections_goes_out_of_scope)
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{}"
};
@ -241,12 +171,12 @@ TEST(stop, pending_callbacks_finished_if_hub_connections_goes_out_of_scope)
return pplx::task_from_result(responses[call_number]);
});
pplx::task<void> t;
pplx::task<web::json::value> t;
{
auto hub_connection = create_hub_connection(websocket_client);
hub_connection->start().get();
t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
t = hub_connection->invoke_json(_XPLATSTR("method"), json::value::array());
}
try
@ -268,12 +198,11 @@ TEST(hub_invocation, hub_connection_invokes_users_code_on_hub_invocations)
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ \"C\":\"d- F430FB19\", \"M\" : [{\"H\":\"my_HUB\", \"M\":\"BROADcast\", \"A\" : [\"message\", 1]}] }",
"{}"
"{ }\x1e",
"{ \"type\": 1, \"target\": \"BROADcast\", \"arguments\": [ \"message\", 1 ] }\x1e"
};
call_number = std::min(call_number + 1, 2);
call_number = std::min(call_number + 1, 1);
return pplx::task_from_result(responses[call_number]);
});
@ -302,7 +231,7 @@ TEST(hub_invocation, hub_connection_discards_persistent_connection_message_primi
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [\"Test\"] }",
"{ \"C\":\"d- F430FB19\", \"M\" : [{\"H\":\"my_hub\", \"M\":\"broadcast\", \"A\" : [\"signal event\", 1]}] }",
"{}"
@ -341,7 +270,7 @@ TEST(hub_invocation, hub_connection_invokes_persistent_connection_message_object
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ }\x1e",
"{ \"C\":\"d-486F0DF9-BAO,5|BAV,1|BAW,0\", \"M\" : [{\"Name\": \"Test\"}] }",
"{ \"C\":\"d- F430FB19\", \"M\" : [{\"H\":\"my_hub\", \"M\":\"broadcast\", \"A\" : [\"signal event\", 1]}] }",
"{}"
@ -377,7 +306,7 @@ TEST(invoke, invoke_creates_correct_payload)
utility::string_t payload;
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); },
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */[&payload](const utility::string_t& m)
{
payload = m;
@ -396,13 +325,13 @@ TEST(invoke, invoke_creates_correct_payload)
// the send is not setup to succeed because it's not needed in this test
}
ASSERT_EQ(_XPLATSTR("{\"A\":[],\"H\":\"my_hub\",\"I\":\"0\",\"M\":\"method\"}"), payload);
ASSERT_EQ(_XPLATSTR("{\"arguments\":[],\"target\":\"method\",\"type\":1}\x1e"), payload);
}
TEST(invoke, callback_not_called_if_send_throws)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); },
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); },
/* send function */[](const utility::string_t&) { return pplx::task_from_exception<void>(std::runtime_error("error")); });
auto hub_connection = create_hub_connection(websocket_client);
@ -424,46 +353,6 @@ TEST(invoke, callback_not_called_if_send_throws)
hub_connection->stop().get();
}
TEST(hub_invocation, hub_connection_logs_if_no_hub_for_invocation)
{
int call_number = -1;
auto done_event = std::make_shared<event>();
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, done_event]()
mutable {
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{ \"C\":\"d- F430FB19\", \"M\" : [{\"H\":\"my_hub\", \"M\":\"broadcast\", \"A\" : [\"message\", 1]}] }",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number == 2)
{
done_event->set();
}
return pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto hub_connection = create_hub_connection(websocket_client, writer, trace_level::info);
auto payload = std::make_shared<utility::string_t>();
hub_connection->start().get();
ASSERT_FALSE(done_event->wait(5000));
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_TRUE(log_entries.size() > 2);
auto entry = remove_date_from_log_entry(log_entries[2]);
ASSERT_EQ(_XPLATSTR("[info ] no proxy found for hub invocation. hub: my_hub, method: broadcast\n"), entry);
}
TEST(invoke_json, invoke_returns_value_returned_from_the_server)
{
auto callback_registered_event = std::make_shared<event>();
@ -474,13 +363,11 @@ TEST(invoke_json, invoke_returns_value_returned_from_the_server)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"C\":\"x\", \"G\":\"gr0\", \"M\":[]}",
"{\"I\":\"0\", \"R\":\"abc\"}",
"{}"
"{ }\x1e",
"{ \"type\": 3, \"invocationId\": \"0\", \"result\": \"abc\" }\x1e"
};
call_number = std::min(call_number + 1, 3);
call_number = std::min(call_number + 1, 1);
if (call_number > 0)
{
@ -512,12 +399,11 @@ TEST(invoke_json, invoke_propagates_errors_from_server_as_exceptions)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\"}",
"{}"
"{ }\x1e",
"{ \"type\": 3, \"invocationId\": \"0\", \"error\": \"Ooops\" }\x1e"
};
call_number = std::min(call_number + 1, 2);
call_number = std::min(call_number + 1, 1);
if (call_number > 0)
{
@ -556,12 +442,11 @@ TEST(invoke_json, invoke_propagates_hub_errors_from_server_as_hub_exceptions)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": true, \"D\": { \"ErrorNumber\" : 42 }}",
"{}"
"{ }\x1e",
"{ \"type\": 3, \"invocationId\": \"0\", \"error\": \"Ooops\" }\x1e"
};
call_number = std::min(call_number + 1, 2);
call_number = std::min(call_number + 1, 1);
if (call_number > 0)
{
@ -587,7 +472,6 @@ TEST(invoke_json, invoke_propagates_hub_errors_from_server_as_hub_exceptions)
catch (const hub_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
ASSERT_EQ(_XPLATSTR("{\"ErrorNumber\":42}"), e.error_data().serialize());
}
}
@ -601,12 +485,11 @@ TEST(invoke_void, invoke_unblocks_task_when_server_completes_call)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\"}",
"{}"
"{ }\x1e",
"{\"I\":\"0\"}\x1e"
};
call_number = std::min(call_number + 1, 2);
call_number = std::min(call_number + 1, 1);
if (call_number > 0)
{
@ -639,8 +522,8 @@ TEST(invoke_void, invoke_logs_if_callback_for_given_id_not_found)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"not tracked\"}",
"{ }\x1e",
"{\"I\":\"not tracked\"}\x1e",
"{}"
};
@ -677,8 +560,8 @@ TEST(invoke_void, invoke_propagates_errors_from_server_as_exceptions)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\"}",
"{ }\x1e",
"{\"I\":\"0\", \"E\" : \"Ooops\"}\x1e",
"{}"
};
@ -721,8 +604,8 @@ TEST(invoke_void, invoke_propagates_hub_errors_from_server_as_hub_exceptions)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": true, \"D\": { \"ErrorNumber\" : 42 }}",
"{ }\x1e",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": true, \"D\": { \"ErrorNumber\" : 42 }}\x1e",
"{}"
};
@ -752,7 +635,6 @@ TEST(invoke_void, invoke_propagates_hub_errors_from_server_as_hub_exceptions)
catch (const hub_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
ASSERT_EQ(_XPLATSTR("{\"ErrorNumber\":42}"), e.error_data().serialize());
}
}
@ -766,8 +648,8 @@ TEST(invoke_void, invoke_creates_hub_exception_even_if_no_error_data)
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": true }",
"{ }\x1e",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": true }\x1e",
"{}"
};
@ -797,7 +679,6 @@ TEST(invoke_void, invoke_creates_hub_exception_even_if_no_error_data)
catch (const hub_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
ASSERT_TRUE(e.error_data().is_null());
}
}
@ -811,8 +692,8 @@ TEST(invoke_void, invoke_creates_runtime_error_when_hub_exception_indicator_fals
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": false }",
"{ }\x1e",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": false }\x1e",
"{}"
};
@ -846,201 +727,54 @@ TEST(invoke_void, invoke_creates_runtime_error_when_hub_exception_indicator_fals
}
}
TEST(invoke_void, invoke_creates_runtime_error_even_if_hub_exception_indicator_exists_but_not_bool)
{
auto callback_registered_event = std::make_shared<event>();
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, callback_registered_event]()
mutable {
std::string responses[]
{
"{\"C\":\"x\", \"S\":1, \"M\":[] }",
"{\"I\":\"0\", \"E\" : \"Ooops\", \"H\": 42 }",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number > 0)
{
callback_registered_event->wait();
}
return pplx::task_from_result(responses[call_number]);
});
auto hub_connection = create_hub_connection(websocket_client);
try
{
hub_connection->start()
.then([hub_connection, callback_registered_event]()
{
auto t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
callback_registered_event->set();
return t;
}).get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const signalr_exception& e)
{
ASSERT_STREQ("\"Ooops\"", e.what());
ASSERT_TRUE(dynamic_cast<const hub_exception *>(&e) == nullptr);
}
}
TEST(reconnect, pending_invocations_finished_if_connection_lost)
{
auto message_sent_event = std::make_shared<event>();
auto init_sent = false;
auto websocket_client = create_test_websocket_client(
/* receive function */ [init_sent, message_sent_event]() mutable
{
if(init_sent)
{
message_sent_event->wait();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
init_sent = true;
return pplx::task_from_result<std::string>("{ \"C\":\"x\", \"S\":1, \"M\":[] }");
},
/* send function */ [](const utility::string_t){ return pplx::task_from_result(); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
auto hub_connection = create_hub_connection(websocket_client);
auto test_completed_event = std::make_shared<event>();
hub_connection->start()
.then([hub_connection, message_sent_event, test_completed_event]()
{
auto invoke_task = hub_connection->invoke_void(_XPLATSTR("TestMethod"), json::value::array())
.then([test_completed_event, hub_connection](pplx::task<void> invoke_void_task)
{
try
{
invoke_void_task.get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::exception& e)
{
ASSERT_STREQ("\"connection has been lost\"", e.what());
}
});
message_sent_event->set();
return invoke_task;
}).get();
}
TEST(reconnect, pending_invocations_finished_and_custom_reconnecting_callback_invoked_if_connection_lost)
{
auto message_sent_event = std::make_shared<event>();
auto init_sent = false;
auto websocket_client = create_test_websocket_client(
/* receive function */ [init_sent, message_sent_event]() mutable
{
if (init_sent)
{
message_sent_event->wait();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
init_sent = true;
return pplx::task_from_result<std::string>("{ \"C\":\"x\", \"S\":1, \"M\":[] }");
},
/* send function */ [](const utility::string_t){ return pplx::task_from_result(); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
auto hub_connection = create_hub_connection(websocket_client);
auto reconnecting_invoked_event = std::make_shared<event >();
hub_connection->set_reconnecting([reconnecting_invoked_event](){ reconnecting_invoked_event->set(); });
hub_connection->start()
.then([hub_connection, message_sent_event]()
{
auto invoke_task = hub_connection->invoke_void(_XPLATSTR("TestMethod"), json::value::array())
.then([hub_connection](pplx::task<void> invoke_void_task)
{
try
{
invoke_void_task.get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::exception& e)
{
ASSERT_STREQ("\"connection has been lost\"", e.what());
}
});
message_sent_event->set();
return invoke_task;
}).get();
ASSERT_FALSE(reconnecting_invoked_event->wait(5000));
}
TEST(reconnect, reconnecting_reconnected_callbacks_invoked)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
auto hub_connection = create_hub_connection(websocket_client);
auto reconnecting_invoked = false;
hub_connection->set_reconnecting([&reconnecting_invoked](){ reconnecting_invoked = true; });
auto reconnected_event = std::make_shared<event>();
hub_connection->set_reconnected([reconnected_event]() { reconnected_event->set(); });
hub_connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_TRUE(reconnecting_invoked);
}
//TEST(invoke_void, invoke_creates_runtime_error)
//{
// auto callback_registered_event = std::make_shared<event>();
//
// int call_number = -1;
// auto websocket_client = create_test_websocket_client(
// /* receive function */ [call_number, callback_registered_event]()
// mutable {
// std::string responses[]
// {
// "{ }\x1e",
// "{ \"type\": 3, \"invocationId\": \"0\", \"error\": \"Ooops\" }\x1e"
// };
//
// call_number = std::min(call_number + 1, 1);
//
// if (call_number > 0)
// {
// callback_registered_event->wait();
// }
//
// return pplx::task_from_result(responses[call_number]);
// });
//
// auto hub_connection = create_hub_connection(websocket_client);
// try
// {
// hub_connection->start()
// .then([hub_connection, callback_registered_event]()
// {
// auto t = hub_connection->invoke_void(_XPLATSTR("method"), json::value::array());
// callback_registered_event->set();
// return t;
// }).get();
//
// ASSERT_TRUE(false); // exception expected but not thrown
// }
// catch (const signalr_exception& e)
// {
// ASSERT_STREQ("\"Ooops\"", e.what());
// ASSERT_TRUE(dynamic_cast<const hub_exception *>(&e) == nullptr);
// }
//}
TEST(connection_id, can_get_connection_id)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = create_hub_connection(websocket_client);
ASSERT_EQ(_XPLATSTR(""), hub_connection->get_connection_id());

View File

@ -8,10 +8,7 @@ using namespace signalr;
TEST(hub_exception_initialization, hub_exception_initialized_correctly)
{
auto error_data = web::json::value::parse(_XPLATSTR("{\"SomeData\" : 42 }"));
auto e = hub_exception{ _XPLATSTR("error"), error_data };
auto e = hub_exception{ _XPLATSTR("error") };
ASSERT_STREQ("error", e.what());
ASSERT_EQ(error_data.serialize(), e.error_data().serialize());
}

View File

@ -48,9 +48,9 @@ TEST(on, cannot_register_handler_if_connection_not_in_disconnected_state)
try
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
/* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); });
auto hub_connection = hub_connection_impl::create(_XPLATSTR("http://fakeuri"), _XPLATSTR(""), trace_level::all,
std::make_shared<trace_log_writer>(), /*use_default_url*/true, create_test_web_request_factory(),
std::make_shared<trace_log_writer>(), create_test_web_request_factory(),
std::make_unique<test_transport_factory>(websocket_client));
hub_connection->start().get();
@ -102,7 +102,7 @@ TEST(invoke_json, invoke_throws_when_the_underlying_connection_is_not_valid)
try
{
hub_connection.invoke(_XPLATSTR("method"), web::json::value()).get();
hub_connection.invoke(_XPLATSTR("method")).get();
ASSERT_TRUE(true); // exception expected but not thrown
}
catch (const signalr_exception& e)
@ -117,11 +117,11 @@ TEST(invoke_void, send_throws_when_the_underlying_connection_is_not_valid)
try
{
hub_connection.send(_XPLATSTR("method"), web::json::value()).get();
hub_connection.send(_XPLATSTR("method")).get();
ASSERT_TRUE(true); // exception expected but not thrown
}
catch (const signalr_exception& e)
{
ASSERT_STREQ("the connection for which this hub proxy was created is no longer valid - it was either destroyed or went out of scope", e.what());
}
}
}

View File

@ -17,9 +17,8 @@ TEST(request_sender_negotiate, request_created_with_correct_url)
auto request_factory = test_web_request_factory([&requested_url](const web::uri &url) -> std::unique_ptr<web_request>
{
utility::string_t response_body(
_XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 30.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}"));
_XPLATSTR("{ \"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"availableTransports\" : [] }"));
requested_url = url;
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
@ -27,7 +26,7 @@ TEST(request_sender_negotiate, request_created_with_correct_url)
request_sender::negotiate(request_factory, web::uri{ _XPLATSTR("http://fake/signalr") }, _XPLATSTR("")).get();
ASSERT_EQ(web::uri(_XPLATSTR("http://fake/signalr/negotiate?clientProtocol=1.4&connectionData=data")), requested_url);
ASSERT_EQ(web::uri(_XPLATSTR("http://fake/signalr/negotiate")), requested_url);
}
TEST(request_sender_negotiate, negotiation_request_sent_and_response_serialized)
@ -35,9 +34,8 @@ TEST(request_sender_negotiate, negotiation_request_sent_and_response_serialized)
auto request_factory = test_web_request_factory([](const web::uri&) -> std::unique_ptr<web_request>
{
utility::string_t response_body(
_XPLATSTR("{\"Invocation\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 30.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.5, \"LongPollDelay\" : 0.0}"));
_XPLATSTR("{\"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"availableTransports\" : [] }"));
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
@ -46,4 +44,4 @@ TEST(request_sender_negotiate, negotiation_request_sent_and_response_serialized)
ASSERT_EQ(_XPLATSTR("f7707523-307d-4cba-9abf-3eef701241e8"), response.connection_id);
// TODO: response.availableTransports
}
}

View File

@ -36,10 +36,9 @@ std::unique_ptr<web_request_factory> create_test_web_request_factory()
return std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate") || url.path() == _XPLATSTR("/signalr/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 10.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"availableTransports\" : [] }")
: url.path() == _XPLATSTR("/start") || url.path() == _XPLATSTR("/signalr/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
@ -83,4 +82,4 @@ utility::string_t dump_vector(const std::vector<utility::string_t>& source)
}
return ss.str();
}
}

View File

@ -324,11 +324,11 @@ TEST(websocket_transport_receive_loop, receive_loop_logs_websocket_exceptions)
trace_level::errors);
}
TEST(websocket_transport_receive_loop, receive_loop_logs_if_receive_task_cancelled)
TEST(websocket_transport_receive_loop, receive_loop_logs_if_receive_task_canceled)
{
receive_loop_logs_exception_runner(
pplx::task_canceled("cancelled"),
_XPLATSTR("[info ] [websocket transport] receive task cancelled.\n"),
pplx::task_canceled("canceled"),
_XPLATSTR("[info ] [websocket transport] receive task canceled.\n"),
trace_level::info);
}
@ -443,4 +443,4 @@ TEST(websocket_transport_get_transport_type, get_transport_type_returns_websocke
[](const utility::string_t&){}, [](const std::exception&){});
ASSERT_EQ(transport_type::websockets, ws_transport->get_transport_type());
}
}