8 #define DRIPLINE_API_EXPORTS
16 #include "param_node.hh"
20 LOGGER(
dlog,
"heartbeater" );
27 f_heartbeat_interval_s( 60 ),
28 f_check_timeout_ms( 1000 ),
29 f_service( a_service ),
36 cancelable::operator=( std::move(a_orig) );
37 f_heartbeat_interval_s = a_orig.f_heartbeat_interval_s;
38 f_check_timeout_ms = a_orig.f_check_timeout_ms;
39 f_service = std::move( a_orig.f_service );
48 throw dripline_error() <<
"Unable to start heartbeater because service pointer is not set";
51 if( f_heartbeat_interval_s == 0 )
53 LINFO(
dlog,
"Heartbeat disabled" );
57 scarab::param_ptr_t t_payload_ptr(
new scarab::param_node() );
58 scarab::param_node& t_payload = t_payload_ptr->as_node();
59 t_payload.add(
"name", a_name );
63 t_key.push_back( a_routing_key );
64 t_key.push_back( a_name );
68 LINFO(
dlog,
"Starting heartbeat loop" );
70 auto t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
71 while( ! f_canceled.load() )
74 std::this_thread::sleep_for( std::chrono::milliseconds( f_check_timeout_ms ) );
77 if( std::chrono::steady_clock().now() >= t_next_heartbeat_at && ! f_canceled.load() )
79 LDEBUG(
dlog,
"Sending heartbeat" );
86 t_receive_reply = f_service->send( t_alert_ptr );
88 if( ! t_receive_reply->f_successful_send )
90 LERROR(
dlog,
"Failed to send reply:\n" + t_receive_reply->f_send_error_message );
95 LWARN(
dlog,
"Operating in offline mode; message not sent" );
99 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
103 LERROR(
dlog,
"Dripline error while sending reply:\n" << e.what() );
106 t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
Error indicating a problem with the connection to the broker.
Dripline-specific errors.
A heartbeater repeatedly sends an alert on a particular time interval.
heartbeater & operator=(const heartbeater &)=delete
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
std::thread f_heartbeat_thread
heartbeater(service *a_service)
Primary constructor. A service pointer is required to be able to send messages.
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Parses routing keys and stores the tokenized information.
std::string to_string() const
Converts the routing-key tokens into a single string.
Primary unit of software that connects to a broker and typically provides an interface with an instru...
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("heartbeater", __FILE_NAME__, __LINE__)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
std::shared_ptr< msg_alert > alert_ptr_t
std::shared_ptr< message > message_ptr_t
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.