8 #define DRIPLINE_API_EXPORTS
21 #include "param_codec.hh"
23 #include "signal_handler.hh"
34 using scarab::param_array;
35 using scarab::param_node;
36 using scarab::param_ptr_t;
37 using scarab::param_value;
44 f_is_dry_run( false ),
51 f_suppress_output( false ),
52 f_json_print( false ),
53 f_pretty_print( false ),
62 const scarab::param_array a_ord_args;
63 execute( a_config, a_ord_args, a_auth );
66 void agent::sub_agent::execute(
const scarab::param_node& a_config,
const scarab::param_array& a_ord_args,
const scarab::authentication& a_auth )
68 LINFO(
dlog,
"Creating message" );
71 param_node t_config( a_config );
73 param_node t_dripline_node;
74 if( t_config.has(
"dripline_mesh" ) )
76 t_dripline_node = std::move(t_config.remove(
"dripline_mesh" )->as_node());
79 core t_core( t_dripline_node, a_auth );
81 t_config.remove(
"auth_file" );
82 t_config.remove(
"auth_groups" );
84 f_agent->set_timeout( t_config.get_value(
"timeout", 10U ) * 1000 );
85 t_config.erase(
"timeout" );
86 f_agent->set_json_print( t_config.get_value(
"json_print", f_agent->get_json_print() ) );
87 t_config.erase(
"json_print" );
88 f_agent->set_pretty_print( t_config.get_value(
"pretty_print", f_agent->get_pretty_print() ) );
89 t_config.erase(
"pretty_print" );
90 f_agent->set_suppress_output( t_config.get_value(
"suppress_output", f_agent->get_suppress_output() ) );
91 t_config.erase(
"suppress_output" );
93 f_agent->routing_key() = t_config.get_value(
"rk", f_agent->routing_key() );
94 t_config.erase(
"rk" );
96 f_agent->specifier() = t_config.get_value(
"specifier", f_agent->specifier() );
97 t_config.erase(
"specifier" );
99 if( t_config.has(
"lockout_key" ) )
101 bool t_lk_valid =
true;
103 t_config.erase(
"lockout_key" );
106 LERROR(
dlog,
"Invalid lockout key provided: <" << t_config.get_value(
"lockout_key",
"" ) <<
">" );
112 if( t_config.has(
"return" ) )
114 f_agent->set_return_code( t_config[
"return"].as_node().get_value(
"code",
dl_success().rc_value() ) );
115 f_agent->return_message() = t_config[
"return"].as_node().get_value(
"message",
"" );
116 t_config.erase(
"return" );
119 f_agent->save_filename() = t_config.get_value(
"save",
"" );
120 t_config.erase(
"save" );
123 scarab::param_array t_values;
124 if( t_config.has(
"values" ) )
126 t_values.merge( t_config[
"values"].as_array() );
127 t_config.erase(
"values" );
129 t_values.merge( a_ord_args );
130 if( t_config.has(
"option_values" ) )
132 t_values.merge( t_config[
"option_values"].as_array() );
133 t_config.erase(
"option_values" );
135 if( ! t_values.empty() )
137 t_config.add(
"values", t_values );
141 if( t_config.has(
"dry_run_msg" ) )
143 f_agent->set_is_dry_run( t_config[
"dry_run_msg"]().as_bool() );
144 t_config.erase(
"dry_run_msg" );
147 this->create_and_send_message( t_config, t_core );
156 LDEBUG(
dlog,
"message payload to send is: " << t_request->payload() );
160 LERROR(
dlog,
"Unable to create request" );
166 if( f_agent->get_is_dry_run() )
168 LPROG(
dlog,
"Request (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_request );
175 t_request->lockout_key() = f_agent->lockout_key();
177 LINFO(
dlog,
"Sending message w/ message_operation = " << t_request->get_message_operation() <<
" to " << t_request->routing_key() );
178 LDEBUG(
dlog,
"Message headers:\n" << t_request->get_message_param(
false ) );
183 t_receive_reply = a_core.
send( t_request );
187 LWARN(
dlog,
"Operating in offline mode; message not sent" );
193 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
199 LERROR(
dlog,
"Unable to send request:\n" << e.what() );
204 if( ! t_receive_reply->f_successful_send )
206 LERROR(
dlog,
"Unable to send request:\n" + t_receive_reply->f_send_error_message );
211 if( ! t_receive_reply->f_consumer_tag.empty() )
213 LINFO(
dlog,
"Waiting for a reply from the server; use ctrl-c to cancel" );
218 auto t_rec_cancel_wrap = wrap_cancelable( t_msg_receiver );
219 scarab::signal_handler::add_cancelable( t_rec_cancel_wrap );
222 if( t_msg_receiver.is_canceled() )
224 LDEBUG(
dlog,
"Agent canceled while waiting for reply" );
225 f_agent->set_return(
dl_success().rc_value() );
229 LINFO(
dlog,
"Response received" );
230 f_agent->set_return( t_reply->get_return_code() );
232 const param& t_payload = t_reply->payload();
234 LPROG(
dlog,
"Response:\n" <<
235 "Return code: " << t_reply->get_return_code() <<
'\n' <<
236 "Return message: " << t_reply->return_message() <<
'\n' <<
239 if( ! f_agent->get_suppress_output() )
241 if( ! f_agent->get_json_print() && ! f_agent->get_pretty_print() )
243 std::cout << *t_reply << std::endl;
247 param_node t_encoding_options;
248 if( f_agent->get_pretty_print() )
250 t_encoding_options.add(
"style",
"pretty" );
252 std::string t_encoded_message = t_reply->encode_full_message( 5000, t_encoding_options );
253 std::cout << t_encoded_message << std::endl;
257 if( ! f_agent->save_filename().empty() && ! t_payload.is_null() )
259 scarab::param_translator t_translator;
260 if( ! t_translator.write_file( t_payload, f_agent->save_filename() ) )
262 LERROR(
dlog,
"Unable to write out payload" );
271 LWARN(
dlog,
"Timed out or while waiting for reply" );
278 LERROR(
dlog,
"Error while waiting for reply" );
282 LERROR(
dlog,
"Unknown state while waiting for reply: " << (
int)t_post_listen_status );
287 f_agent->set_reply( t_reply );
291 f_agent->set_return(
dl_success().rc_value() );
300 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
303 f_agent->return_message(),
304 std::move(t_payload_ptr),
305 f_agent->routing_key(),
306 f_agent->specifier() );
307 LDEBUG(
dlog,
"reply payload to send is: " << t_reply->payload() );
311 LERROR(
dlog,
"Unable to create reply" );
317 if( f_agent->get_is_dry_run() )
319 LPROG(
dlog,
"Reply (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_reply );
324 LINFO(
dlog,
"Sending reply with return code <" << t_reply->get_return_code() <<
"> and message <" << t_reply->return_message() <<
"> to key " << t_reply->routing_key() );
325 LDEBUG(
dlog,
"Message headers:\n" << t_reply->get_message_param(
false ) );
330 t_msg_sent = a_core.
send( t_reply );
334 LWARN(
dlog,
"Operating in offline mode; message not sent" );
340 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
346 LERROR(
dlog,
"Unable to send reply:\n" << e.what() );
351 if( ! t_msg_sent->f_successful_send )
353 LERROR(
dlog,
"Unable to send reply:\n" + t_msg_sent->f_send_error_message );
358 f_agent->set_return(
dl_success().rc_value() );
367 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
370 f_agent->routing_key(),
371 f_agent->specifier() );
372 LDEBUG(
dlog,
"alert payload to send is: " << t_alert->payload() );
376 LERROR(
dlog,
"Unable to create alert" );
382 if( f_agent->get_is_dry_run() )
384 LPROG(
dlog,
"Alert (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_alert );
389 LINFO(
dlog,
"Sending alert with key " << t_alert->routing_key() );
390 LDEBUG(
dlog,
"Message headers:\n" << t_alert->get_message_param(
false ) );
395 t_msg_sent = a_core.
send( t_alert );
399 LWARN(
dlog,
"Operating in offline mode; message not sent" );
405 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
411 LERROR(
dlog,
"Unable to send alert:\n" << e.what() );
416 if( ! t_msg_sent->f_successful_send )
418 LERROR(
dlog,
"Unable to send alert:\n" + t_msg_sent->f_send_error_message );
423 f_agent->set_return(
dl_success().rc_value() );
431 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
435 f_agent->routing_key(),
436 f_agent->specifier() );
442 if( ! a_config.has(
"values" ) )
444 LERROR(
dlog,
"No \"values\" option given" );
448 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
452 f_agent->routing_key(),
453 f_agent->specifier() );
458 param_ptr_t t_payload_ptr(
new param_node() );
459 param_node& t_payload_node = t_payload_ptr->as_node();
462 if( a_config.has(
"load" ) )
464 if( ! a_config[
"load"].as_node().has(
"json" ) )
466 LERROR(
dlog,
"Load instruction did not contain a valid file type");
470 std::string t_load_filename( a_config[
"load"]().as_string() );
471 scarab::param_translator t_translator;
472 scarab::param_ptr_t t_node_from_file = t_translator.read_file( t_load_filename );
473 if( t_node_from_file ==
nullptr || ! t_node_from_file->is_node() )
475 LERROR(
dlog,
"Unable to read JSON file <" << t_load_filename <<
">" );
479 t_payload_node.merge( t_node_from_file->as_node() );
480 a_config.erase(
"load" );
484 t_payload_node.merge( a_config );
488 f_agent->routing_key(),
489 f_agent->specifier() );
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
virtual request_ptr_t create_request(scarab::param_node &a_config)
virtual request_ptr_t create_request(scarab::param_node &a_config)
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
virtual request_ptr_t create_request(scarab::param_node &a_config)
void execute(const scarab::param_node &a_config, const scarab::authentication &a_auth)
Error indicating a problem with the connection to the broker.
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ hard_error
An error occurred, and the channel is no longer valid.
Dripline-specific errors.
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
std::shared_ptr< msg_alert > alert_ptr_t
std::shared_ptr< message > message_ptr_t
uuid_t uuid_from_string(const std::string &a_id_str)
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
std::shared_ptr< msg_reply > reply_ptr_t
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.