Dripline-Cpp  v2.10.11
Dripline Implementation in C++
monitor.cc
Go to the documentation of this file.
1 /*
2  * monitor.cc
3  *
4  * Created on: Jul 1, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "monitor.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "uuid.hh"
14 
15 #include "logger.hh"
16 #include "signal_handler.hh"
17 
18 LOGGER( dlog, "monitor" );
19 
20 namespace dripline
21 {
22 
23  monitor::monitor( const scarab::param_node& a_config, const scarab::authentication& a_auth ) :
24  scarab::cancelable(),
25  core( a_config["dripline_mesh"].as_node(), a_auth ),
27  f_status( status::nothing ),
28  f_name( std::string("monitor_") + string_from_uuid(generate_random_uuid()) ),
29  f_json_print( false ),
30  f_pretty_print( false ),
31  f_requests_keys(),
32  f_alerts_keys()
33  {
34  // get requests keys
35  if( a_config.has( "request_keys" ) && a_config["request_keys"].is_array() )
36  {
37  const scarab::param_array& t_req_keys = a_config["request_keys"].as_array();
38  f_requests_keys.reserve( t_req_keys.size() );
39  for( auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
40  {
41  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << (*t_it)().as_string() << "> on the requests exchange" );
42  f_requests_keys.push_back( (*t_it)().as_string() );
43  }
44  }
45 
46  if( a_config.has( "request_key" ) && a_config["request_key"].is_value() )
47  {
48  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << a_config["request_key"]().as_string() << "> on the requests exchange" );
49  f_requests_keys.push_back( a_config["request_key"]().as_string() );
50  }
51 
52  // get alerts keys
53  if( a_config.has( "alert_keys" ) && a_config["alert_keys"].is_array() )
54  {
55  const scarab::param_array& t_req_keys = a_config["alert_keys"].as_array();
56  f_requests_keys.reserve( t_req_keys.size() );
57  for( auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
58  {
59  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << (*t_it)().as_string() << "> on the alerts exchange" );
60  f_alerts_keys.push_back( (*t_it)().as_string() );
61  }
62  }
63 
64  if( a_config.has( "alert_key" ) && a_config["alert_key"].is_value() )
65  {
66  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << a_config["alert_key"]().as_string() << "> on the alerts exchange" );
67  f_alerts_keys.push_back( a_config["alert_key"]().as_string() );
68  }
69  }
70 
72  {
73  if( f_status >= status::listening )
74  {
75  this->cancel( dl_success().rc_value() );
76  std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
77  }
78  if( f_status > status::exchange_declared ) stop();
79  }
80 
82  {
83  if( f_status != status::nothing )
84  {
85  LERROR( dlog, "Monitor is not in the right status to start" );
86  return false;
87  }
88 
89  if( f_requests_keys.empty() && f_alerts_keys.empty() )
90  {
91  LERROR( dlog, "No keys provided to monitor" );
92  return false;
93  }
94 
95  LINFO( dlog, "Connecting to <" << f_address << ":" << f_port << ">" );
96 
97  LDEBUG( dlog, "Opening channel for message monitor <" << f_name << ">" );
98  f_channel = open_channel();
99  if( ! f_channel ) return false;
100  f_status = status::channel_created;
101 
102  if( ! setup_exchange( f_channel, f_requests_exchange ) ) return false;
103  if( ! setup_exchange( f_channel, f_alerts_exchange ) ) return false;
104  f_status = status::exchange_declared;
105 
106  LDEBUG( dlog, "Setting up queue for message monitor <" << f_name << ">" );
107  if( ! setup_queue( f_channel, f_name ) ) return false;
108  f_status = status::queue_declared;
109 
110  if( ! bind_keys() ) return false;
111  f_status = status::queue_bound;
112 
113  f_consumer_tag = start_consuming( f_channel, f_name );
114  if( f_consumer_tag.empty() ) return false;
115  f_status = status::consuming;
116 
117  return true;
118  }
119 
121  {
122  auto t_cancel_wrap = scarab::wrap_cancelable( *this );
123  scarab::signal_handler::add_cancelable( t_cancel_wrap );
124 
125  if( f_status != status::consuming )
126  {
127  LERROR( dlog, "Monitor is not in the right status to listen" );
128  return false;
129  }
130 
131  f_status = status::listening;
132 
133  try
134  {
135  f_receiver_thread = std::thread( &concurrent_receiver::execute, this );
136 
137  if( ! listen_on_queue() )
138  {
139  throw dripline_error() << "Something went wrong while listening for messages";
140  }
141 
142  f_receiver_thread.join();
143  }
144  catch( std::system_error& e )
145  {
146  LERROR( dlog, "Could not start the a thread due to a system error: " << e.what() );
147  return false;
148  }
149  catch( dripline_error& e )
150  {
151  LERROR( dlog, "Dripline error while running monitor: " << e.what() );
152  return false;
153  }
154  catch( std::exception& e )
155  {
156  LERROR( dlog, "Error while running monitor: " << e.what() );
157  return false;
158  }
159 
160  return true;
161 
162  }
163 
165  {
166  LINFO( dlog, "Stopping message monitor <" << f_name << ">" );
167 
168  if( f_status >= status::listening ) // listening
169  {
170  this->cancel( dl_success().rc_value() );
171  f_status = status::consuming;
172  }
173 
174  if( f_status >= status::queue_bound ) // queue_bound or consuming
175  {
176  if( ! stop_consuming( f_channel, f_consumer_tag ) ) return false;
177  f_status = status::queue_bound;
178  }
179 
180  if( f_status >= status::queue_declared ) // queue_declared or queue_bound
181  {
182  if( ! remove_queue( f_channel, f_name ) ) return false;
183  f_status = status::exchange_declared;
184  }
185 
186  return true;
187  }
188 
190  {
191  LDEBUG( dlog, "Binding request keys for message monitor <" << f_name << ">" );
192  for( auto t_req_key_it = f_requests_keys.begin(); t_req_key_it != f_requests_keys.end(); ++t_req_key_it )
193  {
194  if( ! bind_key( f_channel, f_requests_exchange, f_name, *t_req_key_it ) ) return false;
195  }
196 
197  LDEBUG( dlog, "Binding alerts keys for message monitor <" << f_name << ">" );
198  for( auto t_al_key_it = f_alerts_keys.begin(); t_al_key_it != f_alerts_keys.end(); ++t_al_key_it )
199  {
200  if( ! bind_key( f_channel, f_alerts_exchange, f_name, *t_al_key_it ) ) return false;
201  }
202 
203  return true;
204  }
205 
207  {
208  LINFO( dlog, "Listening for incoming messages on <" << f_name << ">" );
209 
210  while( ! is_canceled() )
211  {
212  amqp_envelope_ptr t_envelope;
214  core::listen_for_message( t_envelope, t_post_listen_status, f_channel, f_consumer_tag, f_listen_timeout_ms );
215 
216  if( f_canceled.load() )
217  {
218  LDEBUG( dlog, "Monitor <" << f_name << "> canceled" );
219  return true;
220  }
221 
222  if( t_post_listen_status == core::post_listen_status::timeout )
223  {
224  // we end up here every time the listen times out with no message received
225  continue;
226  }
227 
228  if( t_post_listen_status == core::post_listen_status::soft_error )
229  {
230  LWARN( dlog, "A soft error ocurred while listening for messages for monitor <" << f_name << ">. The channel is still valid" );
231  continue;
232  }
233 
234  if( t_post_listen_status == core::post_listen_status::hard_error )
235  {
236  LERROR( dlog, "A hard error ocurred while listening for messages for monitor <" << f_name << ">. The channel is no longer valid" );
237  return false;
238  }
239 
240  if( t_post_listen_status == core::post_listen_status::unknown )
241  {
242  LERROR( dlog, "An unknown status occurred while listening for messages for monitor <" << f_name << ">" );
243  return false;
244  }
245 
246  // remaining status is core::post_listen_status::message_received
247 
248  handle_message_chunk( t_envelope );
249 
250  if( f_canceled.load() )
251  {
252  LDEBUG( dlog, "Monitor <" << f_name << "> canceled" );
253  return true;
254  }
255  }
256  return true;
257  }
258 
260  {
261  try
262  {
263  if( ! f_json_print && ! f_pretty_print )
264  {
265  if( a_message->is_request() )
266  {
267  LPROG( dlog, *std::static_pointer_cast< msg_request >( a_message ) );
268  return;
269  }
270  if( a_message->is_reply() )
271  {
272  LPROG( dlog, *std::static_pointer_cast< msg_reply >( a_message ) );
273  return;
274  }
275  if( a_message->is_alert() )
276  {
277  LPROG( dlog, *std::static_pointer_cast< msg_alert >( a_message ) );
278  return;
279  }
280  LPROG( dlog, *a_message );
281  return;
282  }
283  else
284  {
285  scarab::param_node t_encoding_options;
286  if( f_pretty_print )
287  {
288  t_encoding_options.add( "style", "pretty" );
289  }
290  std::string t_encoded_message = a_message->encode_full_message( 5000, t_encoding_options );
291  LPROG( dlog, t_encoded_message );
292  return;
293  }
294  }
295  catch( dripline_error& e )
296  {
297  LERROR( dlog, "<" << f_name << "> Dripline exception caught while handling message: " << e.what() );
298  }
299  catch( std::exception& e )
300  {
301  LERROR( dlog, "<" << f_name << "> Standard exception caught while handling message: " << e.what() );
302  }
303 
304  return;
305  }
306 
307 } /* namespace dripline */
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
Definition: receiver.cc:429
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:75
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
Definition: core.cc:411
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
Definition: core.cc:360
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
Definition: core.cc:528
post_listen_status
Definition: core.hh:80
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:437
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Definition: core.cc:462
amqp_channel_ptr open_channel() const
Definition: core.cc:277
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:385
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:498
Dripline-specific errors.
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:77
virtual ~monitor()
Definition: monitor.cc:71
bool stop()
Stops listening for messages and closes the AMQP connection.
Definition: monitor.cc:164
bool listen()
Starts actively listening for and handling messages (blocking).
Definition: monitor.cc:120
bool start()
Opens the AMQP connection, binds keys, and starts consuming.
Definition: monitor.cc:81
virtual void submit_message(message_ptr_t a_message)
Definition: monitor.cc:259
monitor(const scarab::param_node &a_config, const scarab::authentication &a_auth)
Definition: monitor.cc:23
bool bind_keys()
Definition: monitor.cc:189
virtual bool listen_on_queue()
Definition: monitor.cc:206
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:62
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("monitor", __FILE_NAME__, __LINE__)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:78
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
Definition: agent.hh:18