Dripline-Cpp  v2.10.11
Dripline Implementation in C++
heartbeater.cc
Go to the documentation of this file.
1 /*
2  * heartbeat.cc
3  *
4  * Created on: Apr 26, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "heartbeater.hh"
11 
12 #include "message.hh"
13 #include "service.hh"
14 
15 #include "logger.hh"
16 #include "param_node.hh"
17 
18 #include <iomanip>
19 
20 LOGGER( dlog, "heartbeater" );
21 
22 namespace dripline
23 {
24 
26  cancelable(),
27  f_heartbeat_interval_s( 60 ),
28  f_check_timeout_ms( 1000 ),
29  f_service( a_service ),
30 // f_stop( false ),
31  f_heartbeat_thread()
32  {}
33 
35  {
36  cancelable::operator=( std::move(a_orig) );
37  f_heartbeat_interval_s = a_orig.f_heartbeat_interval_s;
38  f_check_timeout_ms = a_orig.f_check_timeout_ms;
39  f_service = std::move( a_orig.f_service );
40  f_heartbeat_thread = std::move( a_orig.f_heartbeat_thread );
41  return *this;
42  }
43 
44  void heartbeater::execute( const std::string& a_name, uuid_t a_id, const std::string& a_routing_key )
45  {
46  if( ! f_service )
47  {
48  throw dripline_error() << "Unable to start heartbeater because service pointer is not set";
49  }
50 
51  if( f_heartbeat_interval_s == 0 )
52  {
53  LINFO( dlog, "Heartbeat disabled" );
54  return;
55  }
56 
57  scarab::param_ptr_t t_payload_ptr( new scarab::param_node() );
58  scarab::param_node& t_payload = t_payload_ptr->as_node();
59  t_payload.add( "name", a_name );
60  t_payload.add( "id", string_from_uuid(a_id) );
61 
62  routing_key t_key;
63  t_key.push_back( a_routing_key );
64  t_key.push_back( a_name );
65 
66  alert_ptr_t t_alert_ptr = msg_alert::create( std::move(t_payload_ptr), t_key.to_string() );
67 
68  LINFO( dlog, "Starting heartbeat loop" );
69 
70  auto t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
71  while( ! f_canceled.load() )
72  {
73  // wait the interval
74  std::this_thread::sleep_for( std::chrono::milliseconds( f_check_timeout_ms ) );
75 
76  // send the message
77  if( std::chrono::steady_clock().now() >= t_next_heartbeat_at && ! f_canceled.load() )
78  {
79  LDEBUG( dlog, "Sending heartbeat" );
80  // reset message ID so it's different for each heartbeat
81  t_alert_ptr->message_id() = string_from_uuid( generate_random_uuid() );
82 
83  sent_msg_pkg_ptr t_receive_reply;
84  try
85  {
86  t_receive_reply = f_service->send( t_alert_ptr );
87 
88  if( ! t_receive_reply->f_successful_send )
89  {
90  LERROR( dlog, "Failed to send reply:\n" + t_receive_reply->f_send_error_message );
91  }
92  }
93  catch( message_ptr_t )
94  {
95  LWARN( dlog, "Operating in offline mode; message not sent" );
96  }
97  catch( connection_error& e )
98  {
99  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
100  }
101  catch( dripline_error& e )
102  {
103  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() );
104  }
105 
106  t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
107  }
108  }
109 
110  return;
111  }
112 
113 } /* namespace dripline */
114 
115 
Error indicating a problem with the connection to the broker.
Dripline-specific errors.
A heartbeater repeatedly sends an alert on a particular time interval.
Definition: heartbeater.hh:53
heartbeater & operator=(const heartbeater &)=delete
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
Definition: heartbeater.cc:44
std::thread f_heartbeat_thread
Definition: heartbeater.hh:83
heartbeater(service *a_service)
Primary constructor. A service pointer is required to be able to send messages.
Definition: heartbeater.cc:25
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:543
Parses routing keys and stores the tokenized information.
Definition: specifier.hh:28
std::string to_string() const
Converts the routing-key tokens into a single string.
Definition: specifier.cc:33
Primary unit of software that connects to a broker and typically provides an interface with an instru...
Definition: service.hh:85
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("heartbeater", __FILE_NAME__, __LINE__)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:78
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
Definition: uuid.hh:26