8 #define DRIPLINE_API_EXPORTS
16 #include "signal_handler.hh"
23 monitor::monitor(
const scarab::param_node& a_config,
const scarab::authentication& a_auth ) :
25 core( a_config[
"dripline_mesh"].as_node(), a_auth ),
27 f_status(
status::nothing ),
29 f_json_print( false ),
30 f_pretty_print( false ),
35 if( a_config.has(
"request_keys" ) && a_config[
"request_keys"].is_array() )
37 const scarab::param_array& t_req_keys = a_config[
"request_keys"].as_array();
38 f_requests_keys.reserve( t_req_keys.size() );
39 for(
auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
41 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << (*t_it)().as_string() <<
"> on the requests exchange" );
42 f_requests_keys.push_back( (*t_it)().as_string() );
46 if( a_config.has(
"request_key" ) && a_config[
"request_key"].is_value() )
48 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << a_config[
"request_key"]().as_string() <<
"> on the requests exchange" );
49 f_requests_keys.push_back( a_config[
"request_key"]().as_string() );
53 if( a_config.has(
"alert_keys" ) && a_config[
"alert_keys"].is_array() )
55 const scarab::param_array& t_req_keys = a_config[
"alert_keys"].as_array();
56 f_requests_keys.reserve( t_req_keys.size() );
57 for(
auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
59 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << (*t_it)().as_string() <<
"> on the alerts exchange" );
60 f_alerts_keys.push_back( (*t_it)().as_string() );
64 if( a_config.has(
"alert_key" ) && a_config[
"alert_key"].is_value() )
66 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << a_config[
"alert_key"]().as_string() <<
"> on the alerts exchange" );
67 f_alerts_keys.push_back( a_config[
"alert_key"]().as_string() );
76 std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
85 LERROR(
dlog,
"Monitor is not in the right status to start" );
89 if( f_requests_keys.empty() && f_alerts_keys.empty() )
91 LERROR(
dlog,
"No keys provided to monitor" );
95 LINFO(
dlog,
"Connecting to <" << f_address <<
":" << f_port <<
">" );
97 LDEBUG(
dlog,
"Opening channel for message monitor <" << f_name <<
">" );
99 if( ! f_channel )
return false;
102 if( !
setup_exchange( f_channel, f_requests_exchange ) )
return false;
103 if( !
setup_exchange( f_channel, f_alerts_exchange ) )
return false;
106 LDEBUG(
dlog,
"Setting up queue for message monitor <" << f_name <<
">" );
107 if( !
setup_queue( f_channel, f_name ) )
return false;
114 if( f_consumer_tag.empty() )
return false;
122 auto t_cancel_wrap = scarab::wrap_cancelable( *
this );
123 scarab::signal_handler::add_cancelable( t_cancel_wrap );
127 LERROR(
dlog,
"Monitor is not in the right status to listen" );
139 throw dripline_error() <<
"Something went wrong while listening for messages";
142 f_receiver_thread.join();
144 catch( std::system_error& e )
146 LERROR(
dlog,
"Could not start the a thread due to a system error: " << e.what() );
151 LERROR(
dlog,
"Dripline error while running monitor: " << e.what() );
154 catch( std::exception& e )
156 LERROR(
dlog,
"Error while running monitor: " << e.what() );
166 LINFO(
dlog,
"Stopping message monitor <" << f_name <<
">" );
191 LDEBUG(
dlog,
"Binding request keys for message monitor <" << f_name <<
">" );
192 for(
auto t_req_key_it = f_requests_keys.begin(); t_req_key_it != f_requests_keys.end(); ++t_req_key_it )
194 if( !
bind_key( f_channel, f_requests_exchange, f_name, *t_req_key_it ) )
return false;
197 LDEBUG(
dlog,
"Binding alerts keys for message monitor <" << f_name <<
">" );
198 for(
auto t_al_key_it = f_alerts_keys.begin(); t_al_key_it != f_alerts_keys.end(); ++t_al_key_it )
200 if( !
bind_key( f_channel, f_alerts_exchange, f_name, *t_al_key_it ) )
return false;
208 LINFO(
dlog,
"Listening for incoming messages on <" << f_name <<
">" );
210 while( ! is_canceled() )
216 if( f_canceled.load() )
218 LDEBUG(
dlog,
"Monitor <" << f_name <<
"> canceled" );
230 LWARN(
dlog,
"A soft error ocurred while listening for messages for monitor <" << f_name <<
">. The channel is still valid" );
236 LERROR(
dlog,
"A hard error ocurred while listening for messages for monitor <" << f_name <<
">. The channel is no longer valid" );
242 LERROR(
dlog,
"An unknown status occurred while listening for messages for monitor <" << f_name <<
">" );
250 if( f_canceled.load() )
252 LDEBUG(
dlog,
"Monitor <" << f_name <<
"> canceled" );
263 if( ! f_json_print && ! f_pretty_print )
265 if( a_message->is_request() )
267 LPROG(
dlog, *std::static_pointer_cast< msg_request >( a_message ) );
270 if( a_message->is_reply() )
272 LPROG(
dlog, *std::static_pointer_cast< msg_reply >( a_message ) );
275 if( a_message->is_alert() )
277 LPROG(
dlog, *std::static_pointer_cast< msg_alert >( a_message ) );
280 LPROG(
dlog, *a_message );
285 scarab::param_node t_encoding_options;
288 t_encoding_options.add(
"style",
"pretty" );
290 std::string t_encoded_message = a_message->encode_full_message( 5000, t_encoding_options );
291 LPROG(
dlog, t_encoded_message );
297 LERROR(
dlog,
"<" << f_name <<
"> Dripline exception caught while handling message: " << e.what() );
299 catch( std::exception& e )
301 LERROR(
dlog,
"<" << f_name <<
"> Standard exception caught while handling message: " << e.what() );
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
amqp_channel_ptr open_channel() const
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Dripline-specific errors.
Convenience class to bring together listener and concurrent_receiver.
bool stop()
Stops listening for messages and closes the AMQP connection.
bool listen()
Starts actively listening for and handling messages (blocking).
bool start()
Opens the AMQP connection, binds keys, and starts consuming.
virtual void submit_message(message_ptr_t a_message)
monitor(const scarab::param_node &a_config, const scarab::authentication &a_auth)
virtual bool listen_on_queue()
void handle_message_chunk(amqp_envelope_ptr a_envelope)
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("monitor", __FILE_NAME__, __LINE__)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
std::shared_ptr< message > message_ptr_t
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr